]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileStore.cc
1fb4839455ee6d7eb2e61598dc983f4cbf3c53e4
[ceph.git] / ceph / src / os / filestore / FileStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
18
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <sys/file.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/ioctl.h>
28
29 #if defined(__linux__)
30 #include <linux/fs.h>
31 #endif
32
33 #include <iostream>
34 #include <map>
35
36 #include "include/linux_fiemap.h"
37
38 #include "common/xattr.h"
39 #include "chain_xattr.h"
40
41 #if defined(DARWIN) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
44 #endif // DARWIN
45
46
47 #include <fstream>
48 #include <sstream>
49
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
58
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
62
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
74
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1;
77
78 #include "include/assert.h"
79
80 #include "common/config.h"
81 #include "common/blkdev.h"
82
83 #ifdef WITH_LTTNG
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
89 #else
90 #define tracepoint(...)
91 #endif
92
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
95 #undef dout_prefix
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
97
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
100
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
110 #define __FUNC__ __func__ << "(" << __LINE__ << ")"
111
112 //Initial features in new superblock.
113 static CompatSet get_fs_initial_compat_set() {
114 CompatSet::FeatureSet ceph_osd_feature_compat;
115 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
116 CompatSet::FeatureSet ceph_osd_feature_incompat;
117 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
118 ceph_osd_feature_incompat);
119 }
120
121 //Features are added here that this FileStore supports.
122 static CompatSet get_fs_supported_compat_set() {
123 CompatSet compat = get_fs_initial_compat_set();
124 //Any features here can be set in code, but not in initial superblock
125 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
126 return compat;
127 }
128
129 int FileStore::validate_hobject_key(const hobject_t &obj) const
130 {
131 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
132 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
133 }
134
135 int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
136 uuid_d *fsid)
137 {
138 // make sure we don't try to use aio or direct_io (and get annoying
139 // error messages from failing to do so); performance implications
140 // should be irrelevant for this use
141 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
142 return j.peek_fsid(*fsid);
143 }
144
145 void FileStore::FSPerfTracker::update_from_perfcounters(
146 PerfCounters &logger)
147 {
148 os_commit_latency.consume_next(
149 logger.get_tavg_ms(
150 l_filestore_journal_latency));
151 os_apply_latency.consume_next(
152 logger.get_tavg_ms(
153 l_filestore_apply_latency));
154 }
155
156
157 ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
158 {
159 return out << *s.parent;
160 }
161
162 int FileStore::get_cdir(const coll_t& cid, char *s, int len)
163 {
164 const string &cid_str(cid.to_str());
165 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
166 }
167
168 int FileStore::get_index(const coll_t& cid, Index *index)
169 {
170 int r = index_manager.get_index(cid, basedir, index);
171 assert(!m_filestore_fail_eio || r != -EIO);
172 return r;
173 }
174
175 int FileStore::init_index(const coll_t& cid)
176 {
177 char path[PATH_MAX];
178 get_cdir(cid, path, sizeof(path));
179 int r = index_manager.init_index(cid, path, target_version);
180 assert(!m_filestore_fail_eio || r != -EIO);
181 return r;
182 }
183
184 int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
185 {
186 IndexedPath path2;
187 if (!path)
188 path = &path2;
189 int r, exist;
190 assert(NULL != index.index);
191 r = (index.index)->lookup(oid, path, &exist);
192 if (r < 0) {
193 assert(!m_filestore_fail_eio || r != -EIO);
194 return r;
195 }
196 if (!exist)
197 return -ENOENT;
198 return 0;
199 }
200
201 int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
202 {
203 FDRef fd;
204 int r = lfn_open(cid, oid, false, &fd);
205 if (r < 0)
206 return r;
207 r = ::ftruncate(**fd, length);
208 if (r < 0)
209 r = -errno;
210 if (r >= 0 && m_filestore_sloppy_crc) {
211 int rc = backend->_crc_update_truncate(**fd, length);
212 assert(rc >= 0);
213 }
214 lfn_close(fd);
215 assert(!m_filestore_fail_eio || r != -EIO);
216 return r;
217 }
218
219 int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
220 {
221 IndexedPath path;
222 Index index;
223 int r = get_index(cid, &index);
224 if (r < 0)
225 return r;
226
227 assert(NULL != index.index);
228 RWLock::RLocker l((index.index)->access_lock);
229
230 r = lfn_find(oid, index, &path);
231 if (r < 0)
232 return r;
233 r = ::stat(path->path(), buf);
234 if (r < 0)
235 r = -errno;
236 return r;
237 }
238
239 int FileStore::lfn_open(const coll_t& cid,
240 const ghobject_t& oid,
241 bool create,
242 FDRef *outfd,
243 Index *index)
244 {
245 assert(outfd);
246 int r = 0;
247 bool need_lock = true;
248 int flags = O_RDWR;
249
250 if (create)
251 flags |= O_CREAT;
252 if (cct->_conf->filestore_odsync_write) {
253 flags |= O_DSYNC;
254 }
255
256 Index index2;
257 if (!index) {
258 index = &index2;
259 }
260 if (!((*index).index)) {
261 r = get_index(cid, index);
262 if (r < 0) {
263 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
264 return r;
265 }
266 } else {
267 need_lock = false;
268 }
269
270 int fd, exist;
271 assert(NULL != (*index).index);
272 if (need_lock) {
273 ((*index).index)->access_lock.get_write();
274 }
275 if (!replaying) {
276 *outfd = fdcache.lookup(oid);
277 if (*outfd) {
278 if (need_lock) {
279 ((*index).index)->access_lock.put_write();
280 }
281 return 0;
282 }
283 }
284
285
286 IndexedPath path2;
287 IndexedPath *path = &path2;
288
289 r = (*index)->lookup(oid, path, &exist);
290 if (r < 0) {
291 derr << "could not find " << oid << " in index: "
292 << cpp_strerror(-r) << dendl;
293 goto fail;
294 }
295
296 r = ::open((*path)->path(), flags, 0644);
297 if (r < 0) {
298 r = -errno;
299 dout(10) << "error opening file " << (*path)->path() << " with flags="
300 << flags << ": " << cpp_strerror(-r) << dendl;
301 goto fail;
302 }
303 fd = r;
304 if (create && (!exist)) {
305 r = (*index)->created(oid, (*path)->path());
306 if (r < 0) {
307 VOID_TEMP_FAILURE_RETRY(::close(fd));
308 derr << "error creating " << oid << " (" << (*path)->path()
309 << ") in index: " << cpp_strerror(-r) << dendl;
310 goto fail;
311 }
312 r = chain_fsetxattr<true, true>(
313 fd, XATTR_SPILL_OUT_NAME,
314 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
315 if (r < 0) {
316 VOID_TEMP_FAILURE_RETRY(::close(fd));
317 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
318 << "):" << cpp_strerror(-r) << dendl;
319 goto fail;
320 }
321 }
322
323 if (!replaying) {
324 bool existed;
325 *outfd = fdcache.add(oid, fd, &existed);
326 if (existed) {
327 TEMP_FAILURE_RETRY(::close(fd));
328 }
329 } else {
330 *outfd = std::make_shared<FDCache::FD>(fd);
331 }
332
333 if (need_lock) {
334 ((*index).index)->access_lock.put_write();
335 }
336
337 return 0;
338
339 fail:
340
341 if (need_lock) {
342 ((*index).index)->access_lock.put_write();
343 }
344
345 assert(!m_filestore_fail_eio || r != -EIO);
346 return r;
347 }
348
349 void FileStore::lfn_close(FDRef fd)
350 {
351 }
352
353 int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
354 {
355 Index index_new, index_old;
356 IndexedPath path_new, path_old;
357 int exist;
358 int r;
359 bool index_same = false;
360 if (c < newcid) {
361 r = get_index(newcid, &index_new);
362 if (r < 0)
363 return r;
364 r = get_index(c, &index_old);
365 if (r < 0)
366 return r;
367 } else if (c == newcid) {
368 r = get_index(c, &index_old);
369 if (r < 0)
370 return r;
371 index_new = index_old;
372 index_same = true;
373 } else {
374 r = get_index(c, &index_old);
375 if (r < 0)
376 return r;
377 r = get_index(newcid, &index_new);
378 if (r < 0)
379 return r;
380 }
381
382 assert(NULL != index_old.index);
383 assert(NULL != index_new.index);
384
385 if (!index_same) {
386
387 RWLock::RLocker l1((index_old.index)->access_lock);
388
389 r = index_old->lookup(o, &path_old, &exist);
390 if (r < 0) {
391 assert(!m_filestore_fail_eio || r != -EIO);
392 return r;
393 }
394 if (!exist)
395 return -ENOENT;
396
397 RWLock::WLocker l2((index_new.index)->access_lock);
398
399 r = index_new->lookup(newoid, &path_new, &exist);
400 if (r < 0) {
401 assert(!m_filestore_fail_eio || r != -EIO);
402 return r;
403 }
404 if (exist)
405 return -EEXIST;
406
407 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
408 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
409 r = ::link(path_old->path(), path_new->path());
410 if (r < 0)
411 return -errno;
412
413 r = index_new->created(newoid, path_new->path());
414 if (r < 0) {
415 assert(!m_filestore_fail_eio || r != -EIO);
416 return r;
417 }
418 } else {
419 RWLock::WLocker l1((index_old.index)->access_lock);
420
421 r = index_old->lookup(o, &path_old, &exist);
422 if (r < 0) {
423 assert(!m_filestore_fail_eio || r != -EIO);
424 return r;
425 }
426 if (!exist)
427 return -ENOENT;
428
429 r = index_new->lookup(newoid, &path_new, &exist);
430 if (r < 0) {
431 assert(!m_filestore_fail_eio || r != -EIO);
432 return r;
433 }
434 if (exist)
435 return -EEXIST;
436
437 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
438 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
439 r = ::link(path_old->path(), path_new->path());
440 if (r < 0)
441 return -errno;
442
443 // make sure old fd for unlinked/overwritten file is gone
444 fdcache.clear(newoid);
445
446 r = index_new->created(newoid, path_new->path());
447 if (r < 0) {
448 assert(!m_filestore_fail_eio || r != -EIO);
449 return r;
450 }
451 }
452 return 0;
453 }
454
455 int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
456 const SequencerPosition &spos,
457 bool force_clear_omap)
458 {
459 Index index;
460 int r = get_index(cid, &index);
461 if (r < 0) {
462 dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
463 return r;
464 }
465
466 assert(NULL != index.index);
467 RWLock::WLocker l((index.index)->access_lock);
468
469 {
470 IndexedPath path;
471 int hardlink;
472 r = index->lookup(o, &path, &hardlink);
473 if (r < 0) {
474 assert(!m_filestore_fail_eio || r != -EIO);
475 return r;
476 }
477
478 if (!force_clear_omap) {
479 if (hardlink == 0 || hardlink == 1) {
480 force_clear_omap = true;
481 }
482 }
483 if (force_clear_omap) {
484 dout(20) << __FUNC__ << ": clearing omap on " << o
485 << " in cid " << cid << dendl;
486 r = object_map->clear(o, &spos);
487 if (r < 0 && r != -ENOENT) {
488 dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
489 assert(!m_filestore_fail_eio || r != -EIO);
490 return r;
491 }
492 if (cct->_conf->filestore_debug_inject_read_err) {
493 debug_obj_on_delete(o);
494 }
495 if (!m_disable_wbthrottle) {
496 wbthrottle.clear_object(o); // should be only non-cache ref
497 }
498 fdcache.clear(o);
499 } else {
500 /* Ensure that replay of this op doesn't result in the object_map
501 * going away.
502 */
503 if (!backend->can_checkpoint())
504 object_map->sync(&o, &spos);
505 }
506 if (hardlink == 0) {
507 if (!m_disable_wbthrottle) {
508 wbthrottle.clear_object(o); // should be only non-cache ref
509 }
510 return 0;
511 }
512 }
513 r = index->unlink(o);
514 if (r < 0) {
515 dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
516 return r;
517 }
518 return 0;
519 }
520
521 FileStore::FileStore(CephContext* cct, const std::string &base,
522 const std::string &jdev, osflagbits_t flags,
523 const char *name, bool do_update) :
524 JournalingObjectStore(cct, base),
525 internal_name(name),
526 basedir(base), journalpath(jdev),
527 generic_flags(flags),
528 blk_size(0),
529 fsid_fd(-1), op_fd(-1),
530 basedir_fd(-1), current_fd(-1),
531 backend(NULL),
532 index_manager(cct, do_update),
533 lock("FileStore::lock"),
534 force_sync(false),
535 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
536 timer(cct, sync_entry_timeo_lock),
537 stop(false), sync_thread(this),
538 fdcache(cct),
539 wbthrottle(cct),
540 next_osr_id(0),
541 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
542 !cct->_conf->filestore_wbthrottle_enable),
543 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
544 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
545 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
546 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
547 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
548 op_wq(this, cct->_conf->filestore_op_thread_timeout,
549 cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
550 logger(NULL),
551 trace_endpoint("0.0.0.0", 0, "FileStore"),
552 read_error_lock("FileStore::read_error_lock"),
553 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
554 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
555 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
556 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
557 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
558 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
559 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
560 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
561 m_filestore_fadvise(cct->_conf->filestore_fadvise),
562 do_update(do_update),
563 m_journal_dio(cct->_conf->journal_dio),
564 m_journal_aio(cct->_conf->journal_aio),
565 m_journal_force_aio(cct->_conf->journal_force_aio),
566 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
567 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
568 m_filestore_do_dump(false),
569 m_filestore_dump_fmt(true),
570 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
571 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
572 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
573 m_fs_type(0),
574 m_filestore_max_inline_xattr_size(0),
575 m_filestore_max_inline_xattrs(0),
576 m_filestore_max_xattr_value_size(0)
577 {
578 m_filestore_kill_at = cct->_conf->filestore_kill_at;
579 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
580 ostringstream oss;
581 oss << "filestore-ondisk-" << i;
582 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
583 ondisk_finishers.push_back(f);
584 }
585 for (int i = 0; i < m_apply_finisher_num; ++i) {
586 ostringstream oss;
587 oss << "filestore-apply-" << i;
588 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
589 apply_finishers.push_back(f);
590 }
591
592 ostringstream oss;
593 oss << basedir << "/current";
594 current_fn = oss.str();
595
596 ostringstream sss;
597 sss << basedir << "/current/commit_op_seq";
598 current_op_seq_fn = sss.str();
599
600 ostringstream omss;
601 if (cct->_conf->filestore_omap_backend_path != "") {
602 omap_dir = cct->_conf->filestore_omap_backend_path;
603 } else {
604 omss << basedir << "/current/omap";
605 omap_dir = omss.str();
606 }
607
608 // initialize logger
609 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
610
611 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
612 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
613 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
614 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
615 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency");
616 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
617 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
618 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
619 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
620 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
621 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
622 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
623 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
624 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
625 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
626
627 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
628 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
629 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
630 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
631 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg", "Store operation queue latency");
632
633 logger = plb.create_perf_counters();
634
635 cct->get_perfcounters_collection()->add(logger);
636 cct->_conf->add_observer(this);
637
638 superblock.compat_features = get_fs_initial_compat_set();
639 }
640
641 FileStore::~FileStore()
642 {
643 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
644 delete *it;
645 *it = NULL;
646 }
647 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
648 delete *it;
649 *it = NULL;
650 }
651 cct->_conf->remove_observer(this);
652 cct->get_perfcounters_collection()->remove(logger);
653
654 if (journal)
655 journal->logger = NULL;
656 delete logger;
657
658 if (m_filestore_do_dump) {
659 dump_stop();
660 }
661 }
662
663 static void get_attrname(const char *name, char *buf, int len)
664 {
665 snprintf(buf, len, "user.ceph.%s", name);
666 }
667
668 bool parse_attrname(char **name)
669 {
670 if (strncmp(*name, "user.ceph.", 10) == 0) {
671 *name += 10;
672 return true;
673 }
674 return false;
675 }
676
677 void FileStore::collect_metadata(map<string,string> *pm)
678 {
679 char partition_path[PATH_MAX];
680 char dev_node[PATH_MAX];
681 int rc = 0;
682
683 (*pm)["filestore_backend"] = backend->get_name();
684 ostringstream ss;
685 ss << "0x" << std::hex << m_fs_type << std::dec;
686 (*pm)["filestore_f_type"] = ss.str();
687
688 if (cct->_conf->filestore_collect_device_partition_information) {
689 rc = get_device_by_uuid(get_fsid(), "PARTUUID", partition_path,
690 dev_node);
691 } else {
692 rc = -EINVAL;
693 }
694
695 switch (rc) {
696 case -EOPNOTSUPP:
697 case -EINVAL:
698 (*pm)["backend_filestore_partition_path"] = "unknown";
699 (*pm)["backend_filestore_dev_node"] = "unknown";
700 break;
701 case -ENODEV:
702 (*pm)["backend_filestore_partition_path"] = string(partition_path);
703 (*pm)["backend_filestore_dev_node"] = "unknown";
704 break;
705 default:
706 (*pm)["backend_filestore_partition_path"] = string(partition_path);
707 (*pm)["backend_filestore_dev_node"] = string(dev_node);
708 }
709 }
710
711 int FileStore::statfs(struct store_statfs_t *buf0)
712 {
713 struct statfs buf;
714 buf0->reset();
715 if (::statfs(basedir.c_str(), &buf) < 0) {
716 int r = -errno;
717 assert(!m_filestore_fail_eio || r != -EIO);
718 assert(r != -ENOENT);
719 return r;
720 }
721 buf0->total = buf.f_blocks * buf.f_bsize;
722 buf0->available = buf.f_bavail * buf.f_bsize;
723 // Adjust for writes pending in the journal
724 if (journal) {
725 uint64_t estimate = journal->get_journal_size_estimate();
726 if (buf0->available > estimate)
727 buf0->available -= estimate;
728 else
729 buf0->available = 0;
730 }
731 return 0;
732 }
733
734
735 void FileStore::new_journal()
736 {
737 if (journalpath.length()) {
738 dout(10) << "open_journal at " << journalpath << dendl;
739 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
740 journalpath.c_str(),
741 m_journal_dio, m_journal_aio,
742 m_journal_force_aio);
743 if (journal)
744 journal->logger = logger;
745 }
746 return;
747 }
748
749 int FileStore::dump_journal(ostream& out)
750 {
751 int r;
752
753 if (!journalpath.length())
754 return -EINVAL;
755
756 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
757 r = journal->dump(out);
758 delete journal;
759 return r;
760 }
761
762 FileStoreBackend *FileStoreBackend::create(long f_type, FileStore *fs)
763 {
764 switch (f_type) {
765 #if defined(__linux__)
766 case BTRFS_SUPER_MAGIC:
767 return new BtrfsFileStoreBackend(fs);
768 # ifdef HAVE_LIBXFS
769 case XFS_SUPER_MAGIC:
770 return new XfsFileStoreBackend(fs);
771 # endif
772 #endif
773 #ifdef HAVE_LIBZFS
774 case ZFS_SUPER_MAGIC:
775 return new ZFSFileStoreBackend(fs);
776 #endif
777 default:
778 return new GenericFileStoreBackend(fs);
779 }
780 }
781
782 void FileStore::create_backend(long f_type)
783 {
784 m_fs_type = f_type;
785
786 assert(backend == NULL);
787 backend = FileStoreBackend::create(f_type, this);
788
789 dout(0) << "backend " << backend->get_name()
790 << " (magic 0x" << std::hex << f_type << std::dec << ")"
791 << dendl;
792
793 switch (f_type) {
794 #if defined(__linux__)
795 case BTRFS_SUPER_MAGIC:
796 if (!m_disable_wbthrottle){
797 wbthrottle.set_fs(WBThrottle::BTRFS);
798 }
799 break;
800
801 case XFS_SUPER_MAGIC:
802 // wbthrottle is constructed with fs(WBThrottle::XFS)
803 break;
804 #endif
805 }
806
807 set_xattr_limits_via_conf();
808 }
809
810 int FileStore::mkfs()
811 {
812 int ret = 0;
813 char fsid_fn[PATH_MAX];
814 char fsid_str[40];
815 uuid_d old_fsid;
816 uuid_d old_omap_fsid;
817
818 dout(1) << "mkfs in " << basedir << dendl;
819 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
820 if (basedir_fd < 0) {
821 ret = -errno;
822 derr << "mkfs failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
823 return ret;
824 }
825
826 // open+lock fsid
827 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
828 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT, 0644);
829 if (fsid_fd < 0) {
830 ret = -errno;
831 derr << "mkfs: failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
832 goto close_basedir_fd;
833 }
834
835 if (lock_fsid() < 0) {
836 ret = -EBUSY;
837 goto close_fsid_fd;
838 }
839
840 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
841 if (fsid.is_zero()) {
842 fsid.generate_random();
843 dout(1) << "mkfs generated fsid " << fsid << dendl;
844 } else {
845 dout(1) << "mkfs using provided fsid " << fsid << dendl;
846 }
847
848 fsid.print(fsid_str);
849 strcat(fsid_str, "\n");
850 ret = ::ftruncate(fsid_fd, 0);
851 if (ret < 0) {
852 ret = -errno;
853 derr << __FUNC__ << ": failed to truncate fsid: "
854 << cpp_strerror(ret) << dendl;
855 goto close_fsid_fd;
856 }
857 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
858 if (ret < 0) {
859 derr << __FUNC__ << ": failed to write fsid: "
860 << cpp_strerror(ret) << dendl;
861 goto close_fsid_fd;
862 }
863 if (::fsync(fsid_fd) < 0) {
864 ret = -errno;
865 derr << __FUNC__ << ": close failed: can't write fsid: "
866 << cpp_strerror(ret) << dendl;
867 goto close_fsid_fd;
868 }
869 dout(10) << "mkfs fsid is " << fsid << dendl;
870 } else {
871 if (!fsid.is_zero() && fsid != old_fsid) {
872 derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
873 ret = -EINVAL;
874 goto close_fsid_fd;
875 }
876 fsid = old_fsid;
877 dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
878 }
879
880 // version stamp
881 ret = write_version_stamp();
882 if (ret < 0) {
883 derr << __FUNC__ << ": write_version_stamp() failed: "
884 << cpp_strerror(ret) << dendl;
885 goto close_fsid_fd;
886 }
887
888 // superblock
889 superblock.omap_backend = cct->_conf->filestore_omap_backend;
890 ret = write_superblock();
891 if (ret < 0) {
892 derr << __FUNC__ << ": write_superblock() failed: "
893 << cpp_strerror(ret) << dendl;
894 goto close_fsid_fd;
895 }
896
897 struct statfs basefs;
898 ret = ::fstatfs(basedir_fd, &basefs);
899 if (ret < 0) {
900 ret = -errno;
901 derr << __FUNC__ << ": cannot fstatfs basedir "
902 << cpp_strerror(ret) << dendl;
903 goto close_fsid_fd;
904 }
905
906 create_backend(basefs.f_type);
907
908 ret = backend->create_current();
909 if (ret < 0) {
910 derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
911 goto close_fsid_fd;
912 }
913
914 // write initial op_seq
915 {
916 uint64_t initial_seq = 0;
917 int fd = read_op_seq(&initial_seq);
918 if (fd < 0) {
919 ret = fd;
920 derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
921 << cpp_strerror(ret) << dendl;
922 goto close_fsid_fd;
923 }
924 if (initial_seq == 0) {
925 ret = write_op_seq(fd, 1);
926 if (ret < 0) {
927 VOID_TEMP_FAILURE_RETRY(::close(fd));
928 derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
929 << cpp_strerror(ret) << dendl;
930 goto close_fsid_fd;
931 }
932
933 if (backend->can_checkpoint()) {
934 // create snap_1 too
935 current_fd = ::open(current_fn.c_str(), O_RDONLY);
936 assert(current_fd >= 0);
937 char s[NAME_MAX];
938 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
939 ret = backend->create_checkpoint(s, NULL);
940 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
941 if (ret < 0 && ret != -EEXIST) {
942 VOID_TEMP_FAILURE_RETRY(::close(fd));
943 derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
944 goto close_fsid_fd;
945 }
946 }
947 }
948 VOID_TEMP_FAILURE_RETRY(::close(fd));
949 }
950 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
951 if (ret < 0) {
952 derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
953 goto close_fsid_fd;
954 }
955 // create fsid under omap
956 // open+lock fsid
957 int omap_fsid_fd;
958 char omap_fsid_fn[PATH_MAX];
959 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
960 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT, 0644);
961 if (omap_fsid_fd < 0) {
962 ret = -errno;
963 derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
964 goto close_fsid_fd;
965 }
966
967 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
968 assert(!fsid.is_zero());
969 fsid.print(fsid_str);
970 strcat(fsid_str, "\n");
971 ret = ::ftruncate(omap_fsid_fd, 0);
972 if (ret < 0) {
973 ret = -errno;
974 derr << __FUNC__ << ": failed to truncate fsid: "
975 << cpp_strerror(ret) << dendl;
976 goto close_omap_fsid_fd;
977 }
978 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
979 if (ret < 0) {
980 derr << __FUNC__ << ": failed to write fsid: "
981 << cpp_strerror(ret) << dendl;
982 goto close_omap_fsid_fd;
983 }
984 dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
985 if (::fsync(omap_fsid_fd) < 0) {
986 ret = -errno;
987 derr << __FUNC__ << ": close failed: can't write fsid: "
988 << cpp_strerror(ret) << dendl;
989 goto close_omap_fsid_fd;
990 }
991 dout(10) << "mkfs omap fsid is " << fsid << dendl;
992 } else {
993 if (fsid != old_omap_fsid) {
994 derr << __FUNC__ << ": " << omap_fsid_fn
995 << " has existed omap fsid " << old_omap_fsid
996 << " != expected osd fsid " << fsid
997 << dendl;
998 ret = -EINVAL;
999 goto close_omap_fsid_fd;
1000 }
1001 dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
1002 }
1003
1004 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1005
1006 // journal?
1007 ret = mkjournal();
1008 if (ret)
1009 goto close_omap_fsid_fd;
1010
1011 ret = write_meta("type", "filestore");
1012 if (ret)
1013 goto close_omap_fsid_fd;
1014
1015 dout(1) << "mkfs done in " << basedir << dendl;
1016 ret = 0;
1017
1018 close_omap_fsid_fd:
1019 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1020 close_fsid_fd:
1021 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1022 fsid_fd = -1;
1023 close_basedir_fd:
1024 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1025 delete backend;
1026 backend = NULL;
1027 return ret;
1028 }
1029
1030 int FileStore::mkjournal()
1031 {
1032 // read fsid
1033 int ret;
1034 char fn[PATH_MAX];
1035 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1036 int fd = ::open(fn, O_RDONLY, 0644);
1037 if (fd < 0) {
1038 int err = errno;
1039 derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
1040 return -err;
1041 }
1042 ret = read_fsid(fd, &fsid);
1043 if (ret < 0) {
1044 derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
1045 VOID_TEMP_FAILURE_RETRY(::close(fd));
1046 return ret;
1047 }
1048 VOID_TEMP_FAILURE_RETRY(::close(fd));
1049
1050 ret = 0;
1051
1052 new_journal();
1053 if (journal) {
1054 ret = journal->check();
1055 if (ret < 0) {
1056 ret = journal->create();
1057 if (ret)
1058 derr << __FUNC__ << ": error creating journal on " << journalpath
1059 << ": " << cpp_strerror(ret) << dendl;
1060 else
1061 dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
1062 }
1063 delete journal;
1064 journal = 0;
1065 }
1066 return ret;
1067 }
1068
1069 int FileStore::read_fsid(int fd, uuid_d *uuid)
1070 {
1071 char fsid_str[40];
1072 memset(fsid_str, 0, sizeof(fsid_str));
1073 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1074 if (ret < 0)
1075 return ret;
1076 if (ret == 8) {
1077 // old 64-bit fsid... mirror it.
1078 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1079 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1080 return 0;
1081 }
1082
1083 if (ret > 36)
1084 fsid_str[36] = 0;
1085 else
1086 fsid_str[ret] = 0;
1087 if (!uuid->parse(fsid_str))
1088 return -EINVAL;
1089 return 0;
1090 }
1091
1092 int FileStore::lock_fsid()
1093 {
1094 struct flock l;
1095 memset(&l, 0, sizeof(l));
1096 l.l_type = F_WRLCK;
1097 l.l_whence = SEEK_SET;
1098 l.l_start = 0;
1099 l.l_len = 0;
1100 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1101 if (r < 0) {
1102 int err = errno;
1103 dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
1104 << cpp_strerror(err) << dendl;
1105 return -err;
1106 }
1107 return 0;
1108 }
1109
1110 bool FileStore::test_mount_in_use()
1111 {
1112 dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
1113 char fn[PATH_MAX];
1114 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1115
1116 // verify fs isn't in use
1117
1118 fsid_fd = ::open(fn, O_RDWR, 0644);
1119 if (fsid_fd < 0)
1120 return 0; // no fsid, ok.
1121 bool inuse = lock_fsid() < 0;
1122 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1123 fsid_fd = -1;
1124 return inuse;
1125 }
1126
1127 bool FileStore::is_rotational()
1128 {
1129 bool rotational;
1130 if (backend) {
1131 rotational = backend->is_rotational();
1132 } else {
1133 int fd = ::open(basedir.c_str(), O_RDONLY);
1134 if (fd < 0)
1135 return true;
1136 struct statfs st;
1137 int r = ::fstatfs(fd, &st);
1138 ::close(fd);
1139 if (r < 0) {
1140 return true;
1141 }
1142 create_backend(st.f_type);
1143 rotational = backend->is_rotational();
1144 delete backend;
1145 backend = NULL;
1146 }
1147 dout(10) << __func__ << " " << (int)rotational << dendl;
1148 return rotational;
1149 }
1150
1151 int FileStore::_detect_fs()
1152 {
1153 struct statfs st;
1154 int r = ::fstatfs(basedir_fd, &st);
1155 if (r < 0)
1156 return -errno;
1157
1158 blk_size = st.f_bsize;
1159
1160 create_backend(st.f_type);
1161
1162 r = backend->detect_features();
1163 if (r < 0) {
1164 derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
1165 return r;
1166 }
1167
1168 // test xattrs
1169 char fn[PATH_MAX];
1170 int x = rand();
1171 int y = x+1;
1172 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
1173 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC, 0700);
1174 if (tmpfd < 0) {
1175 int ret = -errno;
1176 derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
1177 return ret;
1178 }
1179
1180 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1181 if (ret >= 0)
1182 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1183 if ((ret < 0) || (x != y)) {
1184 derr << "Extended attributes don't appear to work. ";
1185 if (ret)
1186 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1187 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1188 << "file system with the 'user_xattr' option." << dendl;
1189 ::unlink(fn);
1190 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1191 return -ENOTSUP;
1192 }
1193
1194 char buf[1000];
1195 memset(buf, 0, sizeof(buf)); // shut up valgrind
1196 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1197 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1198 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1199 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1200 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1201 if (ret == -ENOSPC) {
1202 dout(0) << "limited size xattrs" << dendl;
1203 }
1204 chain_fremovexattr(tmpfd, "user.test");
1205 chain_fremovexattr(tmpfd, "user.test2");
1206 chain_fremovexattr(tmpfd, "user.test3");
1207 chain_fremovexattr(tmpfd, "user.test4");
1208 chain_fremovexattr(tmpfd, "user.test5");
1209
1210 ::unlink(fn);
1211 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1212
1213 return 0;
1214 }
1215
1216 int FileStore::_sanity_check_fs()
1217 {
1218 // sanity check(s)
1219
1220 if (((int)m_filestore_journal_writeahead +
1221 (int)m_filestore_journal_parallel +
1222 (int)m_filestore_journal_trailing) > 1) {
1223 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1224 cerr << TEXT_RED
1225 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1226 << " is enabled in ceph.conf. You must choose a single journal mode."
1227 << TEXT_NORMAL << std::endl;
1228 return -EINVAL;
1229 }
1230
1231 if (!backend->can_checkpoint()) {
1232 if (!journal || !m_filestore_journal_writeahead) {
1233 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1234 cerr << TEXT_RED
1235 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1236 << " For non-btrfs volumes, a writeahead journal is required to\n"
1237 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1238 << " should include something like:\n"
1239 << " osd journal = /path/to/journal_device_or_file\n"
1240 << " filestore journal writeahead = true\n"
1241 << TEXT_NORMAL;
1242 }
1243 }
1244
1245 if (!journal) {
1246 dout(0) << "mount WARNING: no journal" << dendl;
1247 cerr << TEXT_YELLOW
1248 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1249 << " If you will not be using an osd journal, write latency may be\n"
1250 << " relatively high. It can be reduced somewhat by lowering\n"
1251 << " filestore_max_sync_interval, but lower values mean lower write\n"
1252 << " throughput, especially with spinning disks.\n"
1253 << TEXT_NORMAL;
1254 }
1255
1256 return 0;
1257 }
1258
1259 int FileStore::write_superblock()
1260 {
1261 bufferlist bl;
1262 ::encode(superblock, bl);
1263 return safe_write_file(basedir.c_str(), "superblock",
1264 bl.c_str(), bl.length());
1265 }
1266
1267 int FileStore::read_superblock()
1268 {
1269 bufferptr bp(PATH_MAX);
1270 int ret = safe_read_file(basedir.c_str(), "superblock",
1271 bp.c_str(), bp.length());
1272 if (ret < 0) {
1273 if (ret == -ENOENT) {
1274 // If the file doesn't exist write initial CompatSet
1275 return write_superblock();
1276 }
1277 return ret;
1278 }
1279
1280 bufferlist bl;
1281 bl.push_back(std::move(bp));
1282 bufferlist::iterator i = bl.begin();
1283 ::decode(superblock, i);
1284 return 0;
1285 }
1286
1287 int FileStore::update_version_stamp()
1288 {
1289 return write_version_stamp();
1290 }
1291
1292 int FileStore::version_stamp_is_valid(uint32_t *version)
1293 {
1294 bufferptr bp(PATH_MAX);
1295 int ret = safe_read_file(basedir.c_str(), "store_version",
1296 bp.c_str(), bp.length());
1297 if (ret < 0) {
1298 return ret;
1299 }
1300 bufferlist bl;
1301 bl.push_back(std::move(bp));
1302 bufferlist::iterator i = bl.begin();
1303 ::decode(*version, i);
1304 dout(10) << __FUNC__ << ": was " << *version << " vs target "
1305 << target_version << dendl;
1306 if (*version == target_version)
1307 return 1;
1308 else
1309 return 0;
1310 }
1311
1312 int FileStore::write_version_stamp()
1313 {
1314 dout(1) << __FUNC__ << ": " << target_version << dendl;
1315 bufferlist bl;
1316 ::encode(target_version, bl);
1317
1318 return safe_write_file(basedir.c_str(), "store_version",
1319 bl.c_str(), bl.length());
1320 }
1321
1322 int FileStore::upgrade()
1323 {
1324 dout(1) << __FUNC__ << dendl;
1325 uint32_t version;
1326 int r = version_stamp_is_valid(&version);
1327
1328 if (r == -ENOENT) {
1329 derr << "The store_version file doesn't exist." << dendl;
1330 return -EINVAL;
1331 }
1332 if (r < 0)
1333 return r;
1334 if (r == 1)
1335 return 0;
1336
1337 if (version < 3) {
1338 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1339 return -EINVAL;
1340 }
1341
1342 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1343 // open up DBObjectMap with the do_upgrade flag, which we already did.
1344 update_version_stamp();
1345 return 0;
1346 }
1347
1348 int FileStore::read_op_seq(uint64_t *seq)
1349 {
1350 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
1351 if (op_fd < 0) {
1352 int r = -errno;
1353 assert(!m_filestore_fail_eio || r != -EIO);
1354 return r;
1355 }
1356 char s[40];
1357 memset(s, 0, sizeof(s));
1358 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1359 if (ret < 0) {
1360 derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
1361 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1362 assert(!m_filestore_fail_eio || ret != -EIO);
1363 return ret;
1364 }
1365 *seq = atoll(s);
1366 return op_fd;
1367 }
1368
1369 int FileStore::write_op_seq(int fd, uint64_t seq)
1370 {
1371 char s[30];
1372 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1373 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1374 if (ret < 0) {
1375 ret = -errno;
1376 assert(!m_filestore_fail_eio || ret != -EIO);
1377 }
1378 return ret;
1379 }
1380
1381 int FileStore::mount()
1382 {
1383 int ret;
1384 char buf[PATH_MAX];
1385 uint64_t initial_op_seq;
1386 uuid_d omap_fsid;
1387 set<string> cluster_snaps;
1388 CompatSet supported_compat_set = get_fs_supported_compat_set();
1389
1390 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1391
1392 ret = set_throttle_params();
1393 if (ret != 0)
1394 goto done;
1395
1396 // make sure global base dir exists
1397 if (::access(basedir.c_str(), R_OK | W_OK)) {
1398 ret = -errno;
1399 derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
1400 << cpp_strerror(ret) << dendl;
1401 goto done;
1402 }
1403
1404 // get fsid
1405 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
1406 fsid_fd = ::open(buf, O_RDWR, 0644);
1407 if (fsid_fd < 0) {
1408 ret = -errno;
1409 derr << __FUNC__ << ": error opening '" << buf << "': "
1410 << cpp_strerror(ret) << dendl;
1411 goto done;
1412 }
1413
1414 ret = read_fsid(fsid_fd, &fsid);
1415 if (ret < 0) {
1416 derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
1417 << dendl;
1418 goto close_fsid_fd;
1419 }
1420
1421 if (lock_fsid() < 0) {
1422 derr << __FUNC__ << ": lock_fsid failed" << dendl;
1423 ret = -EBUSY;
1424 goto close_fsid_fd;
1425 }
1426
1427 dout(10) << "mount fsid is " << fsid << dendl;
1428
1429
1430 uint32_t version_stamp;
1431 ret = version_stamp_is_valid(&version_stamp);
1432 if (ret < 0) {
1433 derr << __FUNC__ << ": error in version_stamp_is_valid: "
1434 << cpp_strerror(ret) << dendl;
1435 goto close_fsid_fd;
1436 } else if (ret == 0) {
1437 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
1438 derr << __FUNC__ << ": stale version stamp detected: "
1439 << version_stamp
1440 << ". Proceeding, do_update "
1441 << "is set, performing disk format upgrade."
1442 << dendl;
1443 do_update = true;
1444 } else {
1445 ret = -EINVAL;
1446 derr << __FUNC__ << ": stale version stamp " << version_stamp
1447 << ". Please run the FileStore update script before starting the "
1448 << "OSD, or set filestore_update_to to " << target_version
1449 << " (currently " << cct->_conf->filestore_update_to << ")"
1450 << dendl;
1451 goto close_fsid_fd;
1452 }
1453 }
1454
1455 ret = read_superblock();
1456 if (ret < 0) {
1457 goto close_fsid_fd;
1458 }
1459
1460 // Check if this FileStore supports all the necessary features to mount
1461 if (supported_compat_set.compare(superblock.compat_features) == -1) {
1462 derr << __FUNC__ << ": Incompatible features set "
1463 << superblock.compat_features << dendl;
1464 ret = -EINVAL;
1465 goto close_fsid_fd;
1466 }
1467
1468 // open some dir handles
1469 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
1470 if (basedir_fd < 0) {
1471 ret = -errno;
1472 derr << __FUNC__ << ": failed to open " << basedir << ": "
1473 << cpp_strerror(ret) << dendl;
1474 basedir_fd = -1;
1475 goto close_fsid_fd;
1476 }
1477
1478 // test for btrfs, xattrs, etc.
1479 ret = _detect_fs();
1480 if (ret < 0) {
1481 derr << __FUNC__ << ": error in _detect_fs: "
1482 << cpp_strerror(ret) << dendl;
1483 goto close_basedir_fd;
1484 }
1485
1486 {
1487 list<string> ls;
1488 ret = backend->list_checkpoints(ls);
1489 if (ret < 0) {
1490 derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
1491 goto close_basedir_fd;
1492 }
1493
1494 long long unsigned c, prev = 0;
1495 char clustersnap[NAME_MAX];
1496 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1497 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
1498 assert(c > prev);
1499 prev = c;
1500 snaps.push_back(c);
1501 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1502 cluster_snaps.insert(*it);
1503 }
1504 }
1505
1506 if (m_osd_rollback_to_cluster_snap.length() &&
1507 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1508 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1509 ret = -ENOENT;
1510 goto close_basedir_fd;
1511 }
1512
1513 char nosnapfn[200];
1514 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1515
1516 if (backend->can_checkpoint()) {
1517 if (snaps.empty()) {
1518 dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
1519 } else {
1520 char s[NAME_MAX];
1521 uint64_t curr_seq = 0;
1522
1523 if (m_osd_rollback_to_cluster_snap.length()) {
1524 derr << TEXT_RED
1525 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1526 << TEXT_NORMAL
1527 << dendl;
1528 assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
1529 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1530 } else {
1531 {
1532 int fd = read_op_seq(&curr_seq);
1533 if (fd >= 0) {
1534 VOID_TEMP_FAILURE_RETRY(::close(fd));
1535 }
1536 }
1537 if (curr_seq)
1538 dout(10) << " current/ seq was " << curr_seq << dendl;
1539 else
1540 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1541
1542 uint64_t cp = snaps.back();
1543 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1544
1545 // if current/ is marked as non-snapshotted, refuse to roll
1546 // back (without clear direction) to avoid throwing out new
1547 // data.
1548 struct stat st;
1549 if (::stat(nosnapfn, &st) == 0) {
1550 if (!m_osd_use_stale_snap) {
1551 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1552 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1553 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1554 ret = -ENOTSUP;
1555 goto close_basedir_fd;
1556 }
1557 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1558 << ", newest snap is " << cp << dendl;
1559 cerr << TEXT_YELLOW
1560 << " ** WARNING: forcing the use of stale snapshot data **"
1561 << TEXT_NORMAL << std::endl;
1562 }
1563
1564 dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
1565 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1566 }
1567
1568 // drop current?
1569 ret = backend->rollback_to(s);
1570 if (ret) {
1571 derr << __FUNC__ << ": error rolling back to " << s << ": "
1572 << cpp_strerror(ret) << dendl;
1573 goto close_basedir_fd;
1574 }
1575 }
1576 }
1577 initial_op_seq = 0;
1578
1579 current_fd = ::open(current_fn.c_str(), O_RDONLY);
1580 if (current_fd < 0) {
1581 ret = -errno;
1582 derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
1583 goto close_basedir_fd;
1584 }
1585
1586 assert(current_fd >= 0);
1587
1588 op_fd = read_op_seq(&initial_op_seq);
1589 if (op_fd < 0) {
1590 ret = op_fd;
1591 derr << __FUNC__ << ": read_op_seq failed" << dendl;
1592 goto close_current_fd;
1593 }
1594
1595 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1596 if (initial_op_seq == 0) {
1597 derr << "mount initial op seq is 0; something is wrong" << dendl;
1598 ret = -EINVAL;
1599 goto close_current_fd;
1600 }
1601
1602 if (!backend->can_checkpoint()) {
1603 // mark current/ as non-snapshotted so that we don't rollback away
1604 // from it.
1605 int r = ::creat(nosnapfn, 0644);
1606 if (r < 0) {
1607 ret = -errno;
1608 derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
1609 goto close_current_fd;
1610 }
1611 VOID_TEMP_FAILURE_RETRY(::close(r));
1612 } else {
1613 // clear nosnap marker, if present.
1614 ::unlink(nosnapfn);
1615 }
1616
1617 // check fsid with omap
1618 // get omap fsid
1619 int omap_fsid_fd;
1620 char omap_fsid_buf[PATH_MAX];
1621 struct ::stat omap_fsid_stat;
1622 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1623 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1624 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
1625 dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
1626 << "assume as matched."
1627 << dendl;
1628 }else{
1629 // if osd_uuid exists, compares osd_uuid with fsid
1630 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY, 0644);
1631 if (omap_fsid_fd < 0) {
1632 ret = -errno;
1633 derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
1634 << cpp_strerror(ret)
1635 << dendl;
1636 goto close_current_fd;
1637 }
1638 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1639 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1640 omap_fsid_fd = -1; // defensive
1641 if (ret < 0) {
1642 derr << __FUNC__ << ": error reading omap_fsid_fd"
1643 << ", omap_fsid = " << omap_fsid
1644 << cpp_strerror(ret)
1645 << dendl;
1646 goto close_current_fd;
1647 }
1648 if (fsid != omap_fsid) {
1649 derr << __FUNC__ << ": " << omap_fsid_buf
1650 << " has existed omap fsid " << omap_fsid
1651 << " != expected osd fsid " << fsid
1652 << dendl;
1653 ret = -EINVAL;
1654 goto close_current_fd;
1655 }
1656 }
1657
1658 dout(0) << "start omap initiation" << dendl;
1659 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1660 KeyValueDB * omap_store = KeyValueDB::create(cct,
1661 superblock.omap_backend,
1662 omap_dir);
1663 if (omap_store == NULL)
1664 {
1665 derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
1666 ret = -1;
1667 goto close_current_fd;
1668 }
1669
1670 if (superblock.omap_backend == "rocksdb")
1671 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1672 else
1673 ret = omap_store->init();
1674
1675 if (ret < 0) {
1676 derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
1677 goto close_current_fd;
1678 }
1679
1680 stringstream err;
1681 if (omap_store->create_and_open(err)) {
1682 delete omap_store;
1683 derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
1684 << " : " << err.str() << dendl;
1685 ret = -1;
1686 goto close_current_fd;
1687 }
1688
1689 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1690 ret = dbomap->init(do_update);
1691 if (ret < 0) {
1692 delete dbomap;
1693 derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
1694 goto close_current_fd;
1695 }
1696 stringstream err2;
1697
1698 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1699 derr << err2.str() << dendl;
1700 delete dbomap;
1701 ret = -EINVAL;
1702 goto close_current_fd;
1703 }
1704 object_map.reset(dbomap);
1705 }
1706
1707 // journal
1708 new_journal();
1709
1710 // select journal mode?
1711 if (journal) {
1712 if (!m_filestore_journal_writeahead &&
1713 !m_filestore_journal_parallel &&
1714 !m_filestore_journal_trailing) {
1715 if (!backend->can_checkpoint()) {
1716 m_filestore_journal_writeahead = true;
1717 dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
1718 } else {
1719 m_filestore_journal_parallel = true;
1720 dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
1721 }
1722 } else {
1723 if (m_filestore_journal_writeahead)
1724 dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
1725 if (m_filestore_journal_parallel)
1726 dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
1727 if (m_filestore_journal_trailing)
1728 dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
1729 }
1730 if (m_filestore_journal_writeahead)
1731 journal->set_wait_on_full(true);
1732 } else {
1733 dout(0) << __FUNC__ << ": no journal" << dendl;
1734 }
1735
1736 ret = _sanity_check_fs();
1737 if (ret) {
1738 derr << __FUNC__ << ": _sanity_check_fs failed with error "
1739 << ret << dendl;
1740 goto close_current_fd;
1741 }
1742
1743 // Cleanup possibly invalid collections
1744 {
1745 vector<coll_t> collections;
1746 ret = list_collections(collections, true);
1747 if (ret < 0) {
1748 derr << "Error " << ret << " while listing collections" << dendl;
1749 goto close_current_fd;
1750 }
1751 for (vector<coll_t>::iterator i = collections.begin();
1752 i != collections.end();
1753 ++i) {
1754 Index index;
1755 ret = get_index(*i, &index);
1756 if (ret < 0) {
1757 derr << "Unable to mount index " << *i
1758 << " with error: " << ret << dendl;
1759 goto close_current_fd;
1760 }
1761 assert(NULL != index.index);
1762 RWLock::WLocker l((index.index)->access_lock);
1763
1764 index->cleanup();
1765 }
1766 }
1767 if (!m_disable_wbthrottle) {
1768 wbthrottle.start();
1769 } else {
1770 dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
1771 if (cct->_conf->filestore_odsync_write) {
1772 dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
1773 }
1774 }
1775 sync_thread.create("filestore_sync");
1776
1777 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1778 ret = journal_replay(initial_op_seq);
1779 if (ret < 0) {
1780 derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
1781 if (ret == -ENOTTY) {
1782 derr << "maybe journal is not pointing to a block device and its size "
1783 << "wasn't configured?" << dendl;
1784 }
1785
1786 goto stop_sync;
1787 }
1788 }
1789
1790 {
1791 stringstream err2;
1792 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1793 derr << err2.str() << dendl;
1794 ret = -EINVAL;
1795 goto stop_sync;
1796 }
1797 }
1798
1799 init_temp_collections();
1800
1801 journal_start();
1802
1803 op_tp.start();
1804 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1805 (*it)->start();
1806 }
1807 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1808 (*it)->start();
1809 }
1810
1811 timer.init();
1812
1813 // upgrade?
1814 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1815 int err = upgrade();
1816 if (err < 0) {
1817 derr << "error converting store" << dendl;
1818 umount();
1819 return err;
1820 }
1821 }
1822
1823 // all okay.
1824 return 0;
1825
1826 stop_sync:
1827 // stop sync thread
1828 lock.Lock();
1829 stop = true;
1830 sync_cond.Signal();
1831 lock.Unlock();
1832 sync_thread.join();
1833 if (!m_disable_wbthrottle) {
1834 wbthrottle.stop();
1835 }
1836 close_current_fd:
1837 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1838 current_fd = -1;
1839 close_basedir_fd:
1840 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1841 basedir_fd = -1;
1842 close_fsid_fd:
1843 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1844 fsid_fd = -1;
1845 done:
1846 assert(!m_filestore_fail_eio || ret != -EIO);
1847 delete backend;
1848 backend = NULL;
1849 object_map.reset();
1850 return ret;
1851 }
1852
1853 void FileStore::init_temp_collections()
1854 {
1855 dout(10) << __FUNC__ << dendl;
1856 vector<coll_t> ls;
1857 int r = list_collections(ls, true);
1858 assert(r >= 0);
1859
1860 dout(20) << " ls " << ls << dendl;
1861
1862 SequencerPosition spos;
1863
1864 set<coll_t> temps;
1865 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
1866 if (p->is_temp())
1867 temps.insert(*p);
1868 dout(20) << " temps " << temps << dendl;
1869
1870 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
1871 if (p->is_temp())
1872 continue;
1873 if (p->is_meta())
1874 continue;
1875 coll_t temp = p->get_temp();
1876 if (temps.count(temp)) {
1877 temps.erase(temp);
1878 } else {
1879 dout(10) << __FUNC__ << ": creating " << temp << dendl;
1880 r = _create_collection(temp, 0, spos);
1881 assert(r == 0);
1882 }
1883 }
1884
1885 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
1886 dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
1887 r = _collection_remove_recursive(*p, spos);
1888 assert(r == 0);
1889 }
1890 }
1891
1892 int FileStore::umount()
1893 {
1894 dout(5) << __FUNC__ << ": " << basedir << dendl;
1895
1896 flush();
1897 sync();
1898 do_force_sync();
1899
1900 lock.Lock();
1901 stop = true;
1902 sync_cond.Signal();
1903 lock.Unlock();
1904 sync_thread.join();
1905 if (!m_disable_wbthrottle){
1906 wbthrottle.stop();
1907 }
1908 op_tp.stop();
1909
1910 journal_stop();
1911 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
1912 journal_write_close();
1913
1914 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1915 (*it)->stop();
1916 }
1917 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1918 (*it)->stop();
1919 }
1920
1921 if (fsid_fd >= 0) {
1922 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1923 fsid_fd = -1;
1924 }
1925 if (op_fd >= 0) {
1926 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1927 op_fd = -1;
1928 }
1929 if (current_fd >= 0) {
1930 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1931 current_fd = -1;
1932 }
1933 if (basedir_fd >= 0) {
1934 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1935 basedir_fd = -1;
1936 }
1937
1938 force_sync = false;
1939
1940 delete backend;
1941 backend = NULL;
1942
1943 object_map.reset();
1944
1945 {
1946 Mutex::Locker l(sync_entry_timeo_lock);
1947 timer.shutdown();
1948 }
1949
1950 // nothing
1951 return 0;
1952 }
1953
1954
1955
1956
1957 /// -----------------------------
1958
1959 FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
1960 Context *onreadable,
1961 Context *onreadable_sync,
1962 TrackedOpRef osd_op)
1963 {
1964 uint64_t bytes = 0, ops = 0;
1965 for (vector<Transaction>::iterator p = tls.begin();
1966 p != tls.end();
1967 ++p) {
1968 bytes += (*p).get_num_bytes();
1969 ops += (*p).get_num_ops();
1970 }
1971
1972 Op *o = new Op;
1973 o->start = ceph_clock_now();
1974 o->tls = std::move(tls);
1975 o->onreadable = onreadable;
1976 o->onreadable_sync = onreadable_sync;
1977 o->ops = ops;
1978 o->bytes = bytes;
1979 o->osd_op = osd_op;
1980 return o;
1981 }
1982
1983
1984
1985 void FileStore::queue_op(OpSequencer *osr, Op *o)
1986 {
1987 // queue op on sequencer, then queue sequencer for the threadpool,
1988 // so that regardless of which order the threads pick up the
1989 // sequencer, the op order will be preserved.
1990
1991 osr->queue(o);
1992 o->trace.event("queued");
1993
1994 logger->inc(l_filestore_ops);
1995 logger->inc(l_filestore_bytes, o->bytes);
1996
1997 dout(5) << __FUNC__ << ": " << o << " seq " << o->op
1998 << " " << *osr
1999 << " " << o->bytes << " bytes"
2000 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2001 << dendl;
2002 op_wq.queue(osr);
2003 }
2004
2005 void FileStore::op_queue_reserve_throttle(Op *o)
2006 {
2007 throttle_ops.get();
2008 throttle_bytes.get(o->bytes);
2009
2010 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2011 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2012 }
2013
2014 void FileStore::op_queue_release_throttle(Op *o)
2015 {
2016 throttle_ops.put();
2017 throttle_bytes.put(o->bytes);
2018 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2019 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2020 }
2021
2022 void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2023 {
2024 if (!m_disable_wbthrottle) {
2025 wbthrottle.throttle();
2026 }
2027 // inject a stall?
2028 if (cct->_conf->filestore_inject_stall) {
2029 int orig = cct->_conf->filestore_inject_stall;
2030 dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
2031 sleep(orig);
2032 cct->_conf->set_val("filestore_inject_stall", "0");
2033 dout(5) << __FUNC__ << ": done stalling" << dendl;
2034 }
2035
2036 osr->apply_lock.Lock();
2037 Op *o = osr->peek_queue();
2038 o->trace.event("op_apply_start");
2039 apply_manager.op_apply_start(o->op);
2040 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
2041 o->trace.event("_do_transactions start");
2042 int r = _do_transactions(o->tls, o->op, &handle);
2043 o->trace.event("op_apply_finish");
2044 apply_manager.op_apply_finish(o->op);
2045 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
2046 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
2047
2048 o->tls.clear();
2049
2050 }
2051
2052 void FileStore::_finish_op(OpSequencer *osr)
2053 {
2054 list<Context*> to_queue;
2055 Op *o = osr->dequeue(&to_queue);
2056
2057 utime_t lat = ceph_clock_now();
2058 lat -= o->start;
2059
2060 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " lat " << lat << dendl;
2061 osr->apply_lock.Unlock(); // locked in _do_op
2062 o->trace.event("_finish_op");
2063
2064 // called with tp lock held
2065 op_queue_release_throttle(o);
2066
2067 logger->tinc(l_filestore_apply_latency, lat);
2068
2069 if (o->onreadable_sync) {
2070 o->onreadable_sync->complete(0);
2071 }
2072 if (o->onreadable) {
2073 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2074 }
2075 if (!to_queue.empty()) {
2076 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2077 }
2078 delete o;
2079 }
2080
2081
2082 struct C_JournaledAhead : public Context {
2083 FileStore *fs;
2084 FileStore::OpSequencer *osr;
2085 FileStore::Op *o;
2086 Context *ondisk;
2087
2088 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2089 fs(f), osr(os), o(o), ondisk(ondisk) { }
2090 void finish(int r) override {
2091 fs->_journaled_ahead(osr, o, ondisk);
2092 }
2093 };
2094
2095 int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
2096 TrackedOpRef osd_op,
2097 ThreadPool::TPHandle *handle)
2098 {
2099 Context *onreadable;
2100 Context *ondisk;
2101 Context *onreadable_sync;
2102 ObjectStore::Transaction::collect_contexts(
2103 tls, &onreadable, &ondisk, &onreadable_sync);
2104
2105 if (cct->_conf->objectstore_blackhole) {
2106 dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
2107 << dendl;
2108 delete ondisk;
2109 delete onreadable;
2110 delete onreadable_sync;
2111 return 0;
2112 }
2113
2114 utime_t start = ceph_clock_now();
2115 // set up the sequencer
2116 OpSequencer *osr;
2117 assert(posr);
2118 if (posr->p) {
2119 osr = static_cast<OpSequencer *>(posr->p.get());
2120 dout(5) << __FUNC__ << ": existing " << osr << " " << *osr << dendl;
2121 } else {
2122 osr = new OpSequencer(cct, ++next_osr_id);
2123 osr->set_cct(cct);
2124 osr->parent = posr;
2125 posr->p = osr;
2126 dout(5) << __FUNC__ << ": new " << osr << " " << *osr << dendl;
2127 }
2128
2129 // used to include osr information in tracepoints during transaction apply
2130 for (vector<Transaction>::iterator i = tls.begin(); i != tls.end(); ++i) {
2131 (*i).set_osr(osr);
2132 }
2133
2134 ZTracer::Trace trace;
2135 if (osd_op && osd_op->pg_trace) {
2136 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2137 trace = osd_op->store_trace;
2138 }
2139
2140 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2141 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2142
2143 //prepare and encode transactions data out of lock
2144 bufferlist tbl;
2145 int orig_len = journal->prepare_entry(o->tls, &tbl);
2146
2147 if (handle)
2148 handle->suspend_tp_timeout();
2149
2150 op_queue_reserve_throttle(o);
2151 journal->reserve_throttle_and_backoff(tbl.length());
2152
2153 if (handle)
2154 handle->reset_tp_timeout();
2155
2156 uint64_t op_num = submit_manager.op_submit_start();
2157 o->op = op_num;
2158 trace.keyval("opnum", op_num);
2159
2160 if (m_filestore_do_dump)
2161 dump_transactions(o->tls, o->op, osr);
2162
2163 if (m_filestore_journal_parallel) {
2164 dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
2165
2166 trace.keyval("journal mode", "parallel");
2167 trace.event("journal started");
2168 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2169
2170 // queue inside submit_manager op submission lock
2171 queue_op(osr, o);
2172 trace.event("op queued");
2173 } else if (m_filestore_journal_writeahead) {
2174 dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
2175
2176 osr->queue_journal(o->op);
2177
2178 trace.keyval("journal mode", "writeahead");
2179 trace.event("journal started");
2180 _op_journal_transactions(tbl, orig_len, o->op,
2181 new C_JournaledAhead(this, osr, o, ondisk),
2182 osd_op);
2183 } else {
2184 ceph_abort();
2185 }
2186 submit_manager.op_submit_finish(op_num);
2187 utime_t end = ceph_clock_now();
2188 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2189 return 0;
2190 }
2191
2192 if (!journal) {
2193 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2194 dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
2195
2196 if (handle)
2197 handle->suspend_tp_timeout();
2198
2199 op_queue_reserve_throttle(o);
2200
2201 if (handle)
2202 handle->reset_tp_timeout();
2203
2204 uint64_t op_num = submit_manager.op_submit_start();
2205 o->op = op_num;
2206
2207 if (m_filestore_do_dump)
2208 dump_transactions(o->tls, o->op, osr);
2209
2210 queue_op(osr, o);
2211 trace.keyval("opnum", op_num);
2212 trace.keyval("journal mode", "none");
2213 trace.event("op queued");
2214
2215 if (ondisk)
2216 apply_manager.add_waiter(op_num, ondisk);
2217 submit_manager.op_submit_finish(op_num);
2218 utime_t end = ceph_clock_now();
2219 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2220 return 0;
2221 }
2222
2223 assert(journal);
2224 //prepare and encode transactions data out of lock
2225 bufferlist tbl;
2226 int orig_len = -1;
2227 if (journal->is_writeable()) {
2228 orig_len = journal->prepare_entry(tls, &tbl);
2229 }
2230 uint64_t op = submit_manager.op_submit_start();
2231 dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
2232
2233 if (m_filestore_do_dump)
2234 dump_transactions(tls, op, osr);
2235
2236 trace.event("op_apply_start");
2237 trace.keyval("opnum", op);
2238 trace.keyval("journal mode", "trailing");
2239 apply_manager.op_apply_start(op);
2240 trace.event("do_transactions");
2241 int r = do_transactions(tls, op);
2242
2243 if (r >= 0) {
2244 trace.event("journal started");
2245 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2246 } else {
2247 delete ondisk;
2248 }
2249
2250 // start on_readable finisher after we queue journal item, as on_readable callback
2251 // is allowed to delete the Transaction
2252 if (onreadable_sync) {
2253 onreadable_sync->complete(r);
2254 }
2255 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2256
2257 submit_manager.op_submit_finish(op);
2258 trace.event("op_apply_finish");
2259 apply_manager.op_apply_finish(op);
2260
2261 utime_t end = ceph_clock_now();
2262 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2263 return r;
2264 }
2265
2266 void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2267 {
2268 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
2269
2270 o->trace.event("writeahead journal finished");
2271
2272 // this should queue in order because the journal does it's completions in order.
2273 queue_op(osr, o);
2274
2275 list<Context*> to_queue;
2276 osr->dequeue_journal(&to_queue);
2277
2278 // do ondisk completions async, to prevent any onreadable_sync completions
2279 // getting blocked behind an ondisk completion.
2280 if (ondisk) {
2281 dout(10) << " queueing ondisk " << ondisk << dendl;
2282 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2283 }
2284 if (!to_queue.empty()) {
2285 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2286 }
2287 }
2288
2289 int FileStore::_do_transactions(
2290 vector<Transaction> &tls,
2291 uint64_t op_seq,
2292 ThreadPool::TPHandle *handle)
2293 {
2294 int trans_num = 0;
2295
2296 for (vector<Transaction>::iterator p = tls.begin();
2297 p != tls.end();
2298 ++p, trans_num++) {
2299 _do_transaction(*p, op_seq, trans_num, handle);
2300 if (handle)
2301 handle->reset_tp_timeout();
2302 }
2303
2304 return 0;
2305 }
2306
2307 void FileStore::_set_global_replay_guard(const coll_t& cid,
2308 const SequencerPosition &spos)
2309 {
2310 if (backend->can_checkpoint())
2311 return;
2312
2313 // sync all previous operations on this sequencer
2314 int ret = object_map->sync();
2315 if (ret < 0) {
2316 derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
2317 assert(0 == "_set_global_replay_guard failed");
2318 }
2319 ret = sync_filesystem(basedir_fd);
2320 if (ret < 0) {
2321 derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
2322 assert(0 == "_set_global_replay_guard failed");
2323 }
2324
2325 char fn[PATH_MAX];
2326 get_cdir(cid, fn, sizeof(fn));
2327 int fd = ::open(fn, O_RDONLY);
2328 if (fd < 0) {
2329 int err = errno;
2330 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2331 assert(0 == "_set_global_replay_guard failed");
2332 }
2333
2334 _inject_failure();
2335
2336 // then record that we did it
2337 bufferlist v;
2338 ::encode(spos, v);
2339 int r = chain_fsetxattr<true, true>(
2340 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2341 if (r < 0) {
2342 derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2343 << " got " << cpp_strerror(r) << dendl;
2344 assert(0 == "fsetxattr failed");
2345 }
2346
2347 // and make sure our xattr is durable.
2348 ::fsync(fd);
2349
2350 _inject_failure();
2351
2352 VOID_TEMP_FAILURE_RETRY(::close(fd));
2353 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2354 }
2355
2356 int FileStore::_check_global_replay_guard(const coll_t& cid,
2357 const SequencerPosition& spos)
2358 {
2359 char fn[PATH_MAX];
2360 get_cdir(cid, fn, sizeof(fn));
2361 int fd = ::open(fn, O_RDONLY);
2362 if (fd < 0) {
2363 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2364 return 1; // if collection does not exist, there is no guard, and we can replay.
2365 }
2366
2367 char buf[100];
2368 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2369 if (r < 0) {
2370 dout(20) << __FUNC__ << ": no xattr" << dendl;
2371 assert(!m_filestore_fail_eio || r != -EIO);
2372 VOID_TEMP_FAILURE_RETRY(::close(fd));
2373 return 1; // no xattr
2374 }
2375 bufferlist bl;
2376 bl.append(buf, r);
2377
2378 SequencerPosition opos;
2379 bufferlist::iterator p = bl.begin();
2380 ::decode(opos, p);
2381
2382 VOID_TEMP_FAILURE_RETRY(::close(fd));
2383 return spos >= opos ? 1 : -1;
2384 }
2385
2386
2387 void FileStore::_set_replay_guard(const coll_t& cid,
2388 const SequencerPosition &spos,
2389 bool in_progress=false)
2390 {
2391 char fn[PATH_MAX];
2392 get_cdir(cid, fn, sizeof(fn));
2393 int fd = ::open(fn, O_RDONLY);
2394 if (fd < 0) {
2395 int err = errno;
2396 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2397 assert(0 == "_set_replay_guard failed");
2398 }
2399 _set_replay_guard(fd, spos, 0, in_progress);
2400 VOID_TEMP_FAILURE_RETRY(::close(fd));
2401 }
2402
2403
2404 void FileStore::_set_replay_guard(int fd,
2405 const SequencerPosition& spos,
2406 const ghobject_t *hoid,
2407 bool in_progress)
2408 {
2409 if (backend->can_checkpoint())
2410 return;
2411
2412 dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
2413
2414 _inject_failure();
2415
2416 // first make sure the previous operation commits
2417 ::fsync(fd);
2418
2419 if (!in_progress) {
2420 // sync object_map too. even if this object has a header or keys,
2421 // it have had them in the past and then removed them, so always
2422 // sync.
2423 object_map->sync(hoid, &spos);
2424 }
2425
2426 _inject_failure();
2427
2428 // then record that we did it
2429 bufferlist v(40);
2430 ::encode(spos, v);
2431 ::encode(in_progress, v);
2432 int r = chain_fsetxattr<true, true>(
2433 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2434 if (r < 0) {
2435 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2436 assert(0 == "fsetxattr failed");
2437 }
2438
2439 // and make sure our xattr is durable.
2440 ::fsync(fd);
2441
2442 _inject_failure();
2443
2444 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2445 }
2446
2447 void FileStore::_close_replay_guard(const coll_t& cid,
2448 const SequencerPosition &spos)
2449 {
2450 char fn[PATH_MAX];
2451 get_cdir(cid, fn, sizeof(fn));
2452 int fd = ::open(fn, O_RDONLY);
2453 if (fd < 0) {
2454 int err = errno;
2455 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2456 assert(0 == "_close_replay_guard failed");
2457 }
2458 _close_replay_guard(fd, spos);
2459 VOID_TEMP_FAILURE_RETRY(::close(fd));
2460 }
2461
2462 void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2463 const ghobject_t *hoid)
2464 {
2465 if (backend->can_checkpoint())
2466 return;
2467
2468 dout(10) << __FUNC__ << ": " << spos << dendl;
2469
2470 _inject_failure();
2471
2472 // sync object_map too. even if this object has a header or keys,
2473 // it have had them in the past and then removed them, so always
2474 // sync.
2475 object_map->sync(hoid, &spos);
2476
2477 // then record that we are done with this operation
2478 bufferlist v(40);
2479 ::encode(spos, v);
2480 bool in_progress = false;
2481 ::encode(in_progress, v);
2482 int r = chain_fsetxattr<true, true>(
2483 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2484 if (r < 0) {
2485 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2486 assert(0 == "fsetxattr failed");
2487 }
2488
2489 // and make sure our xattr is durable.
2490 ::fsync(fd);
2491
2492 _inject_failure();
2493
2494 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2495 }
2496
2497 int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2498 const SequencerPosition& spos)
2499 {
2500 if (!replaying || backend->can_checkpoint())
2501 return 1;
2502
2503 int r = _check_global_replay_guard(cid, spos);
2504 if (r < 0)
2505 return r;
2506
2507 FDRef fd;
2508 r = lfn_open(cid, oid, false, &fd);
2509 if (r < 0) {
2510 dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
2511 return 1; // if file does not exist, there is no guard, and we can replay.
2512 }
2513 int ret = _check_replay_guard(**fd, spos);
2514 lfn_close(fd);
2515 return ret;
2516 }
2517
2518 int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2519 {
2520 if (!replaying || backend->can_checkpoint())
2521 return 1;
2522
2523 char fn[PATH_MAX];
2524 get_cdir(cid, fn, sizeof(fn));
2525 int fd = ::open(fn, O_RDONLY);
2526 if (fd < 0) {
2527 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2528 return 1; // if collection does not exist, there is no guard, and we can replay.
2529 }
2530 int ret = _check_replay_guard(fd, spos);
2531 VOID_TEMP_FAILURE_RETRY(::close(fd));
2532 return ret;
2533 }
2534
2535 int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2536 {
2537 if (!replaying || backend->can_checkpoint())
2538 return 1;
2539
2540 char buf[100];
2541 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2542 if (r < 0) {
2543 dout(20) << __FUNC__ << ": no xattr" << dendl;
2544 assert(!m_filestore_fail_eio || r != -EIO);
2545 return 1; // no xattr
2546 }
2547 bufferlist bl;
2548 bl.append(buf, r);
2549
2550 SequencerPosition opos;
2551 bufferlist::iterator p = bl.begin();
2552 ::decode(opos, p);
2553 bool in_progress = false;
2554 if (!p.end()) // older journals don't have this
2555 ::decode(in_progress, p);
2556 if (opos > spos) {
2557 dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
2558 << ", now or in future, SKIPPING REPLAY" << dendl;
2559 return -1;
2560 } else if (opos == spos) {
2561 if (in_progress) {
2562 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2563 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2564 return 0;
2565 } else {
2566 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2567 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2568 return -1;
2569 }
2570 } else {
2571 dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
2572 << ", in past, will replay" << dendl;
2573 return 1;
2574 }
2575 }
2576
2577 void FileStore::_do_transaction(
2578 Transaction& t, uint64_t op_seq, int trans_num,
2579 ThreadPool::TPHandle *handle)
2580 {
2581 dout(10) << __FUNC__ << ": on " << &t << dendl;
2582
2583 #ifdef WITH_LTTNG
2584 const char *osr_name = t.get_osr() ? static_cast<OpSequencer*>(t.get_osr())->get_name().c_str() : "<NULL>";
2585 #endif
2586
2587 Transaction::iterator i = t.begin();
2588
2589 SequencerPosition spos(op_seq, trans_num, 0);
2590 while (i.have_op()) {
2591 if (handle)
2592 handle->reset_tp_timeout();
2593
2594 Transaction::Op *op = i.decode_op();
2595 int r = 0;
2596
2597 _inject_failure();
2598
2599 switch (op->op) {
2600 case Transaction::OP_NOP:
2601 break;
2602 case Transaction::OP_TOUCH:
2603 {
2604 const coll_t &_cid = i.get_cid(op->cid);
2605 const ghobject_t &oid = i.get_oid(op->oid);
2606 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2607 _cid : _cid.get_temp();
2608 tracepoint(objectstore, touch_enter, osr_name);
2609 if (_check_replay_guard(cid, oid, spos) > 0)
2610 r = _touch(cid, oid);
2611 tracepoint(objectstore, touch_exit, r);
2612 }
2613 break;
2614
2615 case Transaction::OP_WRITE:
2616 {
2617 const coll_t &_cid = i.get_cid(op->cid);
2618 const ghobject_t &oid = i.get_oid(op->oid);
2619 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2620 _cid : _cid.get_temp();
2621 uint64_t off = op->off;
2622 uint64_t len = op->len;
2623 uint32_t fadvise_flags = i.get_fadvise_flags();
2624 bufferlist bl;
2625 i.decode_bl(bl);
2626 tracepoint(objectstore, write_enter, osr_name, off, len);
2627 if (_check_replay_guard(cid, oid, spos) > 0)
2628 r = _write(cid, oid, off, len, bl, fadvise_flags);
2629 tracepoint(objectstore, write_exit, r);
2630 }
2631 break;
2632
2633 case Transaction::OP_ZERO:
2634 {
2635 const coll_t &_cid = i.get_cid(op->cid);
2636 const ghobject_t &oid = i.get_oid(op->oid);
2637 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2638 _cid : _cid.get_temp();
2639 uint64_t off = op->off;
2640 uint64_t len = op->len;
2641 tracepoint(objectstore, zero_enter, osr_name, off, len);
2642 if (_check_replay_guard(cid, oid, spos) > 0)
2643 r = _zero(cid, oid, off, len);
2644 tracepoint(objectstore, zero_exit, r);
2645 }
2646 break;
2647
2648 case Transaction::OP_TRIMCACHE:
2649 {
2650 // deprecated, no-op
2651 }
2652 break;
2653
2654 case Transaction::OP_TRUNCATE:
2655 {
2656 const coll_t &_cid = i.get_cid(op->cid);
2657 const ghobject_t &oid = i.get_oid(op->oid);
2658 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2659 _cid : _cid.get_temp();
2660 uint64_t off = op->off;
2661 tracepoint(objectstore, truncate_enter, osr_name, off);
2662 if (_check_replay_guard(cid, oid, spos) > 0)
2663 r = _truncate(cid, oid, off);
2664 tracepoint(objectstore, truncate_exit, r);
2665 }
2666 break;
2667
2668 case Transaction::OP_REMOVE:
2669 {
2670 const coll_t &_cid = i.get_cid(op->cid);
2671 const ghobject_t &oid = i.get_oid(op->oid);
2672 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2673 _cid : _cid.get_temp();
2674 tracepoint(objectstore, remove_enter, osr_name);
2675 if (_check_replay_guard(cid, oid, spos) > 0)
2676 r = _remove(cid, oid, spos);
2677 tracepoint(objectstore, remove_exit, r);
2678 }
2679 break;
2680
2681 case Transaction::OP_SETATTR:
2682 {
2683 const coll_t &_cid = i.get_cid(op->cid);
2684 const ghobject_t &oid = i.get_oid(op->oid);
2685 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2686 _cid : _cid.get_temp();
2687 string name = i.decode_string();
2688 bufferlist bl;
2689 i.decode_bl(bl);
2690 tracepoint(objectstore, setattr_enter, osr_name);
2691 if (_check_replay_guard(cid, oid, spos) > 0) {
2692 map<string, bufferptr> to_set;
2693 to_set[name] = bufferptr(bl.c_str(), bl.length());
2694 r = _setattrs(cid, oid, to_set, spos);
2695 if (r == -ENOSPC)
2696 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2697 << " name " << name << " size " << bl.length() << dendl;
2698 }
2699 tracepoint(objectstore, setattr_exit, r);
2700 }
2701 break;
2702
2703 case Transaction::OP_SETATTRS:
2704 {
2705 const coll_t &_cid = i.get_cid(op->cid);
2706 const ghobject_t &oid = i.get_oid(op->oid);
2707 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2708 _cid : _cid.get_temp();
2709 map<string, bufferptr> aset;
2710 i.decode_attrset(aset);
2711 tracepoint(objectstore, setattrs_enter, osr_name);
2712 if (_check_replay_guard(cid, oid, spos) > 0)
2713 r = _setattrs(cid, oid, aset, spos);
2714 tracepoint(objectstore, setattrs_exit, r);
2715 if (r == -ENOSPC)
2716 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2717 }
2718 break;
2719
2720 case Transaction::OP_RMATTR:
2721 {
2722 const coll_t &_cid = i.get_cid(op->cid);
2723 const ghobject_t &oid = i.get_oid(op->oid);
2724 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2725 _cid : _cid.get_temp();
2726 string name = i.decode_string();
2727 tracepoint(objectstore, rmattr_enter, osr_name);
2728 if (_check_replay_guard(cid, oid, spos) > 0)
2729 r = _rmattr(cid, oid, name.c_str(), spos);
2730 tracepoint(objectstore, rmattr_exit, r);
2731 }
2732 break;
2733
2734 case Transaction::OP_RMATTRS:
2735 {
2736 const coll_t &_cid = i.get_cid(op->cid);
2737 const ghobject_t &oid = i.get_oid(op->oid);
2738 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2739 _cid : _cid.get_temp();
2740 tracepoint(objectstore, rmattrs_enter, osr_name);
2741 if (_check_replay_guard(cid, oid, spos) > 0)
2742 r = _rmattrs(cid, oid, spos);
2743 tracepoint(objectstore, rmattrs_exit, r);
2744 }
2745 break;
2746
2747 case Transaction::OP_CLONE:
2748 {
2749 const coll_t &_cid = i.get_cid(op->cid);
2750 const ghobject_t &oid = i.get_oid(op->oid);
2751 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2752 _cid : _cid.get_temp();
2753 const ghobject_t &noid = i.get_oid(op->dest_oid);
2754 tracepoint(objectstore, clone_enter, osr_name);
2755 r = _clone(cid, oid, noid, spos);
2756 tracepoint(objectstore, clone_exit, r);
2757 }
2758 break;
2759
2760 case Transaction::OP_CLONERANGE:
2761 {
2762 const coll_t &_cid = i.get_cid(op->cid);
2763 const ghobject_t &oid = i.get_oid(op->oid);
2764 const ghobject_t &noid = i.get_oid(op->dest_oid);
2765 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2766 _cid : _cid.get_temp();
2767 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2768 _cid : _cid.get_temp();
2769 uint64_t off = op->off;
2770 uint64_t len = op->len;
2771 tracepoint(objectstore, clone_range_enter, osr_name, len);
2772 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2773 tracepoint(objectstore, clone_range_exit, r);
2774 }
2775 break;
2776
2777 case Transaction::OP_CLONERANGE2:
2778 {
2779 const coll_t &_cid = i.get_cid(op->cid);
2780 const ghobject_t &oid = i.get_oid(op->oid);
2781 const ghobject_t &noid = i.get_oid(op->dest_oid);
2782 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2783 _cid : _cid.get_temp();
2784 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2785 _cid : _cid.get_temp();
2786 uint64_t srcoff = op->off;
2787 uint64_t len = op->len;
2788 uint64_t dstoff = op->dest_off;
2789 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2790 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2791 tracepoint(objectstore, clone_range2_exit, r);
2792 }
2793 break;
2794
2795 case Transaction::OP_MKCOLL:
2796 {
2797 const coll_t &cid = i.get_cid(op->cid);
2798 tracepoint(objectstore, mkcoll_enter, osr_name);
2799 if (_check_replay_guard(cid, spos) > 0)
2800 r = _create_collection(cid, op->split_bits, spos);
2801 tracepoint(objectstore, mkcoll_exit, r);
2802 }
2803 break;
2804
2805 case Transaction::OP_COLL_SET_BITS:
2806 {
2807 const coll_t &cid = i.get_cid(op->cid);
2808 int bits = op->split_bits;
2809 r = _collection_set_bits(cid, bits);
2810 }
2811 break;
2812
2813 case Transaction::OP_COLL_HINT:
2814 {
2815 const coll_t &cid = i.get_cid(op->cid);
2816 uint32_t type = op->hint_type;
2817 bufferlist hint;
2818 i.decode_bl(hint);
2819 bufferlist::iterator hiter = hint.begin();
2820 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2821 uint32_t pg_num;
2822 uint64_t num_objs;
2823 ::decode(pg_num, hiter);
2824 ::decode(num_objs, hiter);
2825 if (_check_replay_guard(cid, spos) > 0) {
2826 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
2827 }
2828 } else {
2829 // Ignore the hint
2830 dout(10) << "Unrecognized collection hint type: " << type << dendl;
2831 }
2832 }
2833 break;
2834
2835 case Transaction::OP_RMCOLL:
2836 {
2837 const coll_t &cid = i.get_cid(op->cid);
2838 tracepoint(objectstore, rmcoll_enter, osr_name);
2839 if (_check_replay_guard(cid, spos) > 0)
2840 r = _destroy_collection(cid);
2841 tracepoint(objectstore, rmcoll_exit, r);
2842 }
2843 break;
2844
2845 case Transaction::OP_COLL_ADD:
2846 {
2847 const coll_t &ocid = i.get_cid(op->cid);
2848 const coll_t &ncid = i.get_cid(op->dest_cid);
2849 const ghobject_t &oid = i.get_oid(op->oid);
2850
2851 assert(oid.hobj.pool >= -1);
2852
2853 // always followed by OP_COLL_REMOVE
2854 Transaction::Op *op2 = i.decode_op();
2855 const coll_t &ocid2 = i.get_cid(op2->cid);
2856 const ghobject_t &oid2 = i.get_oid(op2->oid);
2857 assert(op2->op == Transaction::OP_COLL_REMOVE);
2858 assert(ocid2 == ocid);
2859 assert(oid2 == oid);
2860
2861 tracepoint(objectstore, coll_add_enter);
2862 r = _collection_add(ncid, ocid, oid, spos);
2863 tracepoint(objectstore, coll_add_exit, r);
2864 spos.op++;
2865 if (r < 0)
2866 break;
2867 tracepoint(objectstore, coll_remove_enter, osr_name);
2868 if (_check_replay_guard(ocid, oid, spos) > 0)
2869 r = _remove(ocid, oid, spos);
2870 tracepoint(objectstore, coll_remove_exit, r);
2871 }
2872 break;
2873
2874 case Transaction::OP_COLL_MOVE:
2875 {
2876 // WARNING: this is deprecated and buggy; only here to replay old journals.
2877 const coll_t &ocid = i.get_cid(op->cid);
2878 const coll_t &ncid = i.get_cid(op->dest_cid);
2879 const ghobject_t &oid = i.get_oid(op->oid);
2880 tracepoint(objectstore, coll_move_enter);
2881 r = _collection_add(ocid, ncid, oid, spos);
2882 if (r == 0 &&
2883 (_check_replay_guard(ocid, oid, spos) > 0))
2884 r = _remove(ocid, oid, spos);
2885 tracepoint(objectstore, coll_move_exit, r);
2886 }
2887 break;
2888
2889 case Transaction::OP_COLL_MOVE_RENAME:
2890 {
2891 const coll_t &_oldcid = i.get_cid(op->cid);
2892 const ghobject_t &oldoid = i.get_oid(op->oid);
2893 const coll_t &_newcid = i.get_cid(op->dest_cid);
2894 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2895 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
2896 _oldcid : _oldcid.get_temp();
2897 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
2898 _oldcid : _newcid.get_temp();
2899 tracepoint(objectstore, coll_move_rename_enter);
2900 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
2901 tracepoint(objectstore, coll_move_rename_exit, r);
2902 }
2903 break;
2904
2905 case Transaction::OP_TRY_RENAME:
2906 {
2907 const coll_t &_cid = i.get_cid(op->cid);
2908 const ghobject_t &oldoid = i.get_oid(op->oid);
2909 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2910 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
2911 _cid : _cid.get_temp();
2912 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
2913 _cid : _cid.get_temp();
2914 tracepoint(objectstore, coll_try_rename_enter);
2915 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
2916 tracepoint(objectstore, coll_try_rename_exit, r);
2917 }
2918 break;
2919
2920 case Transaction::OP_COLL_SETATTR:
2921 case Transaction::OP_COLL_RMATTR:
2922 assert(0 == "collection attr methods no longer implemented");
2923 break;
2924
2925 case Transaction::OP_STARTSYNC:
2926 tracepoint(objectstore, startsync_enter, osr_name);
2927 _start_sync();
2928 tracepoint(objectstore, startsync_exit);
2929 break;
2930
2931 case Transaction::OP_COLL_RENAME:
2932 {
2933 r = -EOPNOTSUPP;
2934 }
2935 break;
2936
2937 case Transaction::OP_OMAP_CLEAR:
2938 {
2939 const coll_t &_cid = i.get_cid(op->cid);
2940 const ghobject_t &oid = i.get_oid(op->oid);
2941 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2942 _cid : _cid.get_temp();
2943 tracepoint(objectstore, omap_clear_enter, osr_name);
2944 r = _omap_clear(cid, oid, spos);
2945 tracepoint(objectstore, omap_clear_exit, r);
2946 }
2947 break;
2948 case Transaction::OP_OMAP_SETKEYS:
2949 {
2950 const coll_t &_cid = i.get_cid(op->cid);
2951 const ghobject_t &oid = i.get_oid(op->oid);
2952 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2953 _cid : _cid.get_temp();
2954 map<string, bufferlist> aset;
2955 i.decode_attrset(aset);
2956 tracepoint(objectstore, omap_setkeys_enter, osr_name);
2957 r = _omap_setkeys(cid, oid, aset, spos);
2958 tracepoint(objectstore, omap_setkeys_exit, r);
2959 }
2960 break;
2961 case Transaction::OP_OMAP_RMKEYS:
2962 {
2963 const coll_t &_cid = i.get_cid(op->cid);
2964 const ghobject_t &oid = i.get_oid(op->oid);
2965 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2966 _cid : _cid.get_temp();
2967 set<string> keys;
2968 i.decode_keyset(keys);
2969 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
2970 r = _omap_rmkeys(cid, oid, keys, spos);
2971 tracepoint(objectstore, omap_rmkeys_exit, r);
2972 }
2973 break;
2974 case Transaction::OP_OMAP_RMKEYRANGE:
2975 {
2976 const coll_t &_cid = i.get_cid(op->cid);
2977 const ghobject_t &oid = i.get_oid(op->oid);
2978 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2979 _cid : _cid.get_temp();
2980 string first, last;
2981 first = i.decode_string();
2982 last = i.decode_string();
2983 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
2984 r = _omap_rmkeyrange(cid, oid, first, last, spos);
2985 tracepoint(objectstore, omap_rmkeyrange_exit, r);
2986 }
2987 break;
2988 case Transaction::OP_OMAP_SETHEADER:
2989 {
2990 const coll_t &_cid = i.get_cid(op->cid);
2991 const ghobject_t &oid = i.get_oid(op->oid);
2992 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2993 _cid : _cid.get_temp();
2994 bufferlist bl;
2995 i.decode_bl(bl);
2996 tracepoint(objectstore, omap_setheader_enter, osr_name);
2997 r = _omap_setheader(cid, oid, bl, spos);
2998 tracepoint(objectstore, omap_setheader_exit, r);
2999 }
3000 break;
3001 case Transaction::OP_SPLIT_COLLECTION:
3002 {
3003 assert(0 == "not legacy journal; upgrade to firefly first");
3004 }
3005 break;
3006 case Transaction::OP_SPLIT_COLLECTION2:
3007 {
3008 coll_t cid = i.get_cid(op->cid);
3009 uint32_t bits = op->split_bits;
3010 uint32_t rem = op->split_rem;
3011 coll_t dest = i.get_cid(op->dest_cid);
3012 tracepoint(objectstore, split_coll2_enter, osr_name);
3013 r = _split_collection(cid, bits, rem, dest, spos);
3014 tracepoint(objectstore, split_coll2_exit, r);
3015 }
3016 break;
3017
3018 case Transaction::OP_SETALLOCHINT:
3019 {
3020 const coll_t &_cid = i.get_cid(op->cid);
3021 const ghobject_t &oid = i.get_oid(op->oid);
3022 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3023 _cid : _cid.get_temp();
3024 uint64_t expected_object_size = op->expected_object_size;
3025 uint64_t expected_write_size = op->expected_write_size;
3026 tracepoint(objectstore, setallochint_enter, osr_name);
3027 if (_check_replay_guard(cid, oid, spos) > 0)
3028 r = _set_alloc_hint(cid, oid, expected_object_size,
3029 expected_write_size);
3030 tracepoint(objectstore, setallochint_exit, r);
3031 }
3032 break;
3033
3034 default:
3035 derr << "bad op " << op->op << dendl;
3036 ceph_abort();
3037 }
3038
3039 if (r < 0) {
3040 bool ok = false;
3041
3042 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3043 op->op == Transaction::OP_CLONE ||
3044 op->op == Transaction::OP_CLONERANGE2 ||
3045 op->op == Transaction::OP_COLL_ADD ||
3046 op->op == Transaction::OP_SETATTR ||
3047 op->op == Transaction::OP_SETATTRS ||
3048 op->op == Transaction::OP_RMATTR ||
3049 op->op == Transaction::OP_OMAP_SETKEYS ||
3050 op->op == Transaction::OP_OMAP_RMKEYS ||
3051 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3052 op->op == Transaction::OP_OMAP_SETHEADER))
3053 // -ENOENT is normally okay
3054 // ...including on a replayed OP_RMCOLL with checkpoint mode
3055 ok = true;
3056 if (r == -ENODATA)
3057 ok = true;
3058
3059 if (op->op == Transaction::OP_SETALLOCHINT)
3060 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3061 // cases means invalid hint size (e.g. too big, not a multiple
3062 // of block size, etc) or, at least on xfs, an attempt to set
3063 // or change it when the file is not empty. However,
3064 // OP_SETALLOCHINT is advisory, so ignore all errors.
3065 ok = true;
3066
3067 if (replaying && !backend->can_checkpoint()) {
3068 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3069 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3070 ok = true;
3071 }
3072 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3073 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3074 ok = true;
3075 }
3076 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3077 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3078 ok = true;
3079 }
3080 if (r == -ERANGE) {
3081 dout(10) << "tolerating ERANGE on replay" << dendl;
3082 ok = true;
3083 }
3084 if (r == -ENOENT) {
3085 dout(10) << "tolerating ENOENT on replay" << dendl;
3086 ok = true;
3087 }
3088 }
3089
3090 if (!ok) {
3091 const char *msg = "unexpected error code";
3092
3093 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3094 op->op == Transaction::OP_CLONE ||
3095 op->op == Transaction::OP_CLONERANGE2)) {
3096 msg = "ENOENT on clone suggests osd bug";
3097 } else if (r == -ENOSPC) {
3098 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3099 // by partially applying transactions.
3100 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3101 } else if (r == -ENOTEMPTY) {
3102 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3103 } else if (r == -EPERM) {
3104 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3105 }
3106
3107 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3108 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3109 dout(0) << msg << dendl;
3110 dout(0) << " transaction dump:\n";
3111 JSONFormatter f(true);
3112 f.open_object_section("transaction");
3113 t.dump(&f);
3114 f.close_section();
3115 f.flush(*_dout);
3116 *_dout << dendl;
3117
3118 if (r == -EMFILE) {
3119 dump_open_fds(cct);
3120 }
3121
3122 assert(0 == "unexpected error");
3123 }
3124 }
3125
3126 spos.op++;
3127 }
3128
3129 _inject_failure();
3130 }
3131
3132 /*********************************************/
3133
3134
3135
3136 // --------------------
3137 // objects
3138
3139 bool FileStore::exists(const coll_t& _cid, const ghobject_t& oid)
3140 {
3141 tracepoint(objectstore, exists_enter, _cid.c_str());
3142 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3143 struct stat st;
3144 bool retval = stat(cid, oid, &st) == 0;
3145 tracepoint(objectstore, exists_exit, retval);
3146 return retval;
3147 }
3148
3149 int FileStore::stat(
3150 const coll_t& _cid, const ghobject_t& oid, struct stat *st, bool allow_eio)
3151 {
3152 tracepoint(objectstore, stat_enter, _cid.c_str());
3153 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3154 int r = lfn_stat(cid, oid, st);
3155 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
3156 if (r < 0) {
3157 dout(10) << __FUNC__ << ": " << cid << "/" << oid
3158 << " = " << r << dendl;
3159 } else {
3160 dout(10) << __FUNC__ << ": " << cid << "/" << oid
3161 << " = " << r
3162 << " (size " << st->st_size << ")" << dendl;
3163 }
3164 if (cct->_conf->filestore_debug_inject_read_err &&
3165 debug_mdata_eio(oid)) {
3166 return -EIO;
3167 } else {
3168 tracepoint(objectstore, stat_exit, r);
3169 return r;
3170 }
3171 }
3172
3173 int FileStore::set_collection_opts(
3174 const coll_t& cid,
3175 const pool_opts_t& opts)
3176 {
3177 return -EOPNOTSUPP;
3178 }
3179
3180 int FileStore::read(
3181 const coll_t& _cid,
3182 const ghobject_t& oid,
3183 uint64_t offset,
3184 size_t len,
3185 bufferlist& bl,
3186 uint32_t op_flags,
3187 bool allow_eio)
3188 {
3189 int got;
3190 tracepoint(objectstore, read_enter, _cid.c_str(), offset, len);
3191 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3192
3193 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3194
3195 FDRef fd;
3196 int r = lfn_open(cid, oid, false, &fd);
3197 if (r < 0) {
3198 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
3199 << cpp_strerror(r) << dendl;
3200 return r;
3201 }
3202
3203 if (offset == 0 && len == 0) {
3204 struct stat st;
3205 memset(&st, 0, sizeof(struct stat));
3206 int r = ::fstat(**fd, &st);
3207 assert(r == 0);
3208 len = st.st_size;
3209 }
3210
3211 #ifdef HAVE_POSIX_FADVISE
3212 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3213 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3214 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3215 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3216 #endif
3217
3218 bufferptr bptr(len); // prealloc space for entire read
3219 got = safe_pread(**fd, bptr.c_str(), len, offset);
3220 if (got < 0) {
3221 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3222 lfn_close(fd);
3223 if (!(allow_eio || !m_filestore_fail_eio || got != -EIO)) {
3224 derr << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3225 assert(0 == "eio on pread");
3226 }
3227 return got;
3228 }
3229 bptr.set_length(got); // properly size the buffer
3230 bl.clear();
3231 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3232
3233 #ifdef HAVE_POSIX_FADVISE
3234 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3235 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3236 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3237 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3238 #endif
3239
3240 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3241 ostringstream ss;
3242 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3243 if (errors != 0) {
3244 dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3245 << got << " ... BAD CRC:\n" << ss.str() << dendl;
3246 assert(0 == "bad crc on read");
3247 }
3248 }
3249
3250 lfn_close(fd);
3251
3252 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3253 << got << "/" << len << dendl;
3254 if (cct->_conf->filestore_debug_inject_read_err &&
3255 debug_data_eio(oid)) {
3256 return -EIO;
3257 } else {
3258 tracepoint(objectstore, read_exit, got);
3259 return got;
3260 }
3261 }
3262
3263 int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3264 map<uint64_t, uint64_t> *m)
3265 {
3266 uint64_t i;
3267 struct fiemap_extent *extent = NULL;
3268 struct fiemap *fiemap = NULL;
3269 int r = 0;
3270
3271 more:
3272 r = backend->do_fiemap(fd, offset, len, &fiemap);
3273 if (r < 0)
3274 return r;
3275
3276 if (fiemap->fm_mapped_extents == 0) {
3277 free(fiemap);
3278 return r;
3279 }
3280
3281 extent = &fiemap->fm_extents[0];
3282
3283 /* start where we were asked to start */
3284 if (extent->fe_logical < offset) {
3285 extent->fe_length -= offset - extent->fe_logical;
3286 extent->fe_logical = offset;
3287 }
3288
3289 i = 0;
3290
3291 struct fiemap_extent *last = nullptr;
3292 while (i < fiemap->fm_mapped_extents) {
3293 struct fiemap_extent *next = extent + 1;
3294
3295 dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
3296 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3297
3298 /* try to merge extents */
3299 while ((i < fiemap->fm_mapped_extents - 1) &&
3300 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3301 next->fe_length += extent->fe_length;
3302 next->fe_logical = extent->fe_logical;
3303 extent = next;
3304 next = extent + 1;
3305 i++;
3306 }
3307
3308 if (extent->fe_logical + extent->fe_length > offset + len)
3309 extent->fe_length = offset + len - extent->fe_logical;
3310 (*m)[extent->fe_logical] = extent->fe_length;
3311 i++;
3312 last = extent++;
3313 }
3314 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3315 offset = last->fe_logical + last->fe_length;
3316 len -= xoffset;
3317 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3318 free(fiemap);
3319 if (!is_last) {
3320 goto more;
3321 }
3322
3323 return r;
3324 }
3325
3326 int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3327 map<uint64_t, uint64_t> *m)
3328 {
3329 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3330 off_t hole_pos, data_pos;
3331 int r = 0;
3332
3333 // If lseek fails with errno setting to be ENXIO, this means the current
3334 // file offset is beyond the end of the file.
3335 off_t start = offset;
3336 while(start < (off_t)(offset + len)) {
3337 data_pos = lseek(fd, start, SEEK_DATA);
3338 if (data_pos < 0) {
3339 if (errno == ENXIO)
3340 break;
3341 else {
3342 r = -errno;
3343 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3344 return r;
3345 }
3346 } else if (data_pos > (off_t)(offset + len)) {
3347 break;
3348 }
3349
3350 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3351 if (hole_pos < 0) {
3352 if (errno == ENXIO) {
3353 break;
3354 } else {
3355 r = -errno;
3356 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3357 return r;
3358 }
3359 }
3360
3361 if (hole_pos >= (off_t)(offset + len)) {
3362 (*m)[data_pos] = offset + len - data_pos;
3363 break;
3364 }
3365 (*m)[data_pos] = hole_pos - data_pos;
3366 start = hole_pos;
3367 }
3368
3369 return r;
3370 #else
3371 (*m)[offset] = len;
3372 return 0;
3373 #endif
3374 }
3375
3376 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3377 uint64_t offset, size_t len,
3378 bufferlist& bl)
3379 {
3380 map<uint64_t, uint64_t> exomap;
3381 int r = fiemap(_cid, oid, offset, len, exomap);
3382 if (r >= 0) {
3383 ::encode(exomap, bl);
3384 }
3385 return r;
3386 }
3387
3388 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3389 uint64_t offset, size_t len,
3390 map<uint64_t, uint64_t>& destmap)
3391 {
3392 tracepoint(objectstore, fiemap_enter, _cid.c_str(), offset, len);
3393 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3394 destmap.clear();
3395
3396 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3397 len <= (size_t)m_filestore_fiemap_threshold) {
3398 destmap[offset] = len;
3399 return 0;
3400 }
3401
3402 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3403
3404 FDRef fd;
3405
3406 int r = lfn_open(cid, oid, false, &fd);
3407 if (r < 0) {
3408 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3409 goto done;
3410 }
3411
3412 if (backend->has_seek_data_hole()) {
3413 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3414 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3415 } else if (backend->has_fiemap()) {
3416 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3417 r = _do_fiemap(**fd, offset, len, &destmap);
3418 }
3419
3420 lfn_close(fd);
3421
3422 done:
3423
3424 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
3425 assert(!m_filestore_fail_eio || r != -EIO);
3426 tracepoint(objectstore, fiemap_exit, r);
3427 return r;
3428 }
3429
3430 int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3431 const SequencerPosition &spos)
3432 {
3433 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3434 int r = lfn_unlink(cid, oid, spos);
3435 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3436 return r;
3437 }
3438
3439 int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3440 {
3441 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
3442 int r = lfn_truncate(cid, oid, size);
3443 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
3444 return r;
3445 }
3446
3447
3448 int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3449 {
3450 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3451
3452 FDRef fd;
3453 int r = lfn_open(cid, oid, true, &fd);
3454 if (r < 0) {
3455 return r;
3456 } else {
3457 lfn_close(fd);
3458 }
3459 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3460 return r;
3461 }
3462
3463 int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3464 uint64_t offset, size_t len,
3465 const bufferlist& bl, uint32_t fadvise_flags)
3466 {
3467 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3468 int r;
3469
3470 FDRef fd;
3471 r = lfn_open(cid, oid, true, &fd);
3472 if (r < 0) {
3473 dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
3474 << oid << ": "
3475 << cpp_strerror(r) << dendl;
3476 goto out;
3477 }
3478
3479 // write
3480 r = bl.write_fd(**fd, offset);
3481 if (r < 0) {
3482 derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
3483 << " error: " << cpp_strerror(r) << dendl;
3484 lfn_close(fd);
3485 goto out;
3486 }
3487 r = bl.length();
3488
3489 if (r >= 0 && m_filestore_sloppy_crc) {
3490 int rc = backend->_crc_update_write(**fd, offset, len, bl);
3491 assert(rc >= 0);
3492 }
3493
3494 if (replaying || m_disable_wbthrottle) {
3495 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3496 #ifdef HAVE_POSIX_FADVISE
3497 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3498 #endif
3499 }
3500 } else {
3501 wbthrottle.queue_wb(fd, oid, offset, len,
3502 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3503 }
3504
3505 lfn_close(fd);
3506
3507 out:
3508 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
3509 return r;
3510 }
3511
3512 int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3513 {
3514 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3515 int ret = 0;
3516
3517 if (cct->_conf->filestore_punch_hole) {
3518 #ifdef CEPH_HAVE_FALLOCATE
3519 # if !defined(DARWIN) && !defined(__FreeBSD__)
3520 # ifdef FALLOC_FL_KEEP_SIZE
3521 // first try to punch a hole.
3522 FDRef fd;
3523 ret = lfn_open(cid, oid, false, &fd);
3524 if (ret < 0) {
3525 goto out;
3526 }
3527
3528 struct stat st;
3529 ret = ::fstat(**fd, &st);
3530 if (ret < 0) {
3531 ret = -errno;
3532 lfn_close(fd);
3533 goto out;
3534 }
3535
3536 // first try fallocate
3537 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3538 offset, len);
3539 if (ret < 0) {
3540 ret = -errno;
3541 } else {
3542 // ensure we extent file size, if needed
3543 if (offset + len > (uint64_t)st.st_size) {
3544 ret = ::ftruncate(**fd, offset + len);
3545 if (ret < 0) {
3546 ret = -errno;
3547 lfn_close(fd);
3548 goto out;
3549 }
3550 }
3551 }
3552 lfn_close(fd);
3553
3554 if (ret >= 0 && m_filestore_sloppy_crc) {
3555 int rc = backend->_crc_update_zero(**fd, offset, len);
3556 assert(rc >= 0);
3557 }
3558
3559 if (ret == 0)
3560 goto out; // yay!
3561 if (ret != -EOPNOTSUPP)
3562 goto out; // some other error
3563 # endif
3564 # endif
3565 #endif
3566 }
3567
3568 // lame, kernel is old and doesn't support it.
3569 // write zeros.. yuck!
3570 dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
3571 {
3572 bufferlist bl;
3573 bl.append_zero(len);
3574 ret = _write(cid, oid, offset, len, bl);
3575 }
3576
3577 #ifdef CEPH_HAVE_FALLOCATE
3578 # if !defined(DARWIN) && !defined(__FreeBSD__)
3579 # ifdef FALLOC_FL_KEEP_SIZE
3580 out:
3581 # endif
3582 # endif
3583 #endif
3584 dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
3585 return ret;
3586 }
3587
3588 int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3589 const SequencerPosition& spos)
3590 {
3591 dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
3592
3593 if (_check_replay_guard(cid, newoid, spos) < 0)
3594 return 0;
3595
3596 int r;
3597 FDRef o, n;
3598 {
3599 Index index;
3600 r = lfn_open(cid, oldoid, false, &o, &index);
3601 if (r < 0) {
3602 goto out2;
3603 }
3604 assert(NULL != (index.index));
3605 RWLock::WLocker l((index.index)->access_lock);
3606
3607 r = lfn_open(cid, newoid, true, &n, &index);
3608 if (r < 0) {
3609 goto out;
3610 }
3611 r = ::ftruncate(**n, 0);
3612 if (r < 0) {
3613 r = -errno;
3614 goto out3;
3615 }
3616 struct stat st;
3617 r = ::fstat(**o, &st);
3618 if (r < 0) {
3619 r = -errno;
3620 goto out3;
3621 }
3622
3623 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3624 if (r < 0) {
3625 goto out3;
3626 }
3627
3628 dout(20) << "objectmap clone" << dendl;
3629 r = object_map->clone(oldoid, newoid, &spos);
3630 if (r < 0 && r != -ENOENT)
3631 goto out3;
3632 }
3633
3634 {
3635 char buf[2];
3636 map<string, bufferptr> aset;
3637 r = _fgetattrs(**o, aset);
3638 if (r < 0)
3639 goto out3;
3640
3641 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3642 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3643 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3644 sizeof(XATTR_NO_SPILL_OUT));
3645 } else {
3646 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3647 sizeof(XATTR_SPILL_OUT));
3648 }
3649 if (r < 0)
3650 goto out3;
3651
3652 r = _fsetattrs(**n, aset);
3653 if (r < 0)
3654 goto out3;
3655 }
3656
3657 // clone is non-idempotent; record our work.
3658 _set_replay_guard(**n, spos, &newoid);
3659
3660 out3:
3661 lfn_close(n);
3662 out:
3663 lfn_close(o);
3664 out2:
3665 dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
3666 assert(!m_filestore_fail_eio || r != -EIO);
3667 return r;
3668 }
3669
3670 int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3671 {
3672 dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
3673 return backend->clone_range(from, to, srcoff, len, dstoff);
3674 }
3675
3676 int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3677 {
3678 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3679 int r = 0;
3680 map<uint64_t, uint64_t> exomap;
3681 // fiemap doesn't allow zero length
3682 if (len == 0)
3683 return 0;
3684
3685 if (backend->has_seek_data_hole()) {
3686 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3687 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3688 } else if (backend->has_fiemap()) {
3689 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3690 r = _do_fiemap(from, srcoff, len, &exomap);
3691 }
3692
3693
3694 int64_t written = 0;
3695 if (r < 0)
3696 goto out;
3697
3698 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3699 uint64_t it_off = miter->first - srcoff + dstoff;
3700 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3701 if (r < 0) {
3702 derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
3703 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3704 break;
3705 }
3706 written += miter->second;
3707 }
3708
3709 if (r >= 0) {
3710 if (m_filestore_sloppy_crc) {
3711 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3712 assert(rc >= 0);
3713 }
3714 struct stat st;
3715 r = ::fstat(to, &st);
3716 if (r < 0) {
3717 r = -errno;
3718 derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
3719 goto out;
3720 }
3721 if (st.st_size < (int)(dstoff + len)) {
3722 r = ::ftruncate(to, dstoff + len);
3723 if (r < 0) {
3724 r = -errno;
3725 derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
3726 goto out;
3727 }
3728 }
3729 r = written;
3730 }
3731
3732 out:
3733 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3734 return r;
3735 }
3736
3737 int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3738 {
3739 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3740 int r = 0;
3741 loff_t pos = srcoff;
3742 loff_t end = srcoff + len;
3743 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3744
3745 #ifdef CEPH_HAVE_SPLICE
3746 if (backend->has_splice()) {
3747 int pipefd[2];
3748 if (pipe(pipefd) < 0) {
3749 r = -errno;
3750 derr << " pipe " << " got " << cpp_strerror(r) << dendl;
3751 return r;
3752 }
3753
3754 loff_t dstpos = dstoff;
3755 while (pos < end) {
3756 int l = MIN(end-pos, buflen);
3757 r = safe_splice(from, &pos, pipefd[1], NULL, l, SPLICE_F_NONBLOCK);
3758 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3759 if (r < 0) {
3760 derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
3761 << ", " << cpp_strerror(r) << dendl;
3762 break;
3763 }
3764 if (r == 0) {
3765 // hrm, bad source range, wtf.
3766 r = -ERANGE;
3767 derr << __FUNC__ << ": got short read result at " << pos
3768 << " of fd " << from << " len " << len << dendl;
3769 break;
3770 }
3771
3772 r = safe_splice(pipefd[0], NULL, to, &dstpos, r, 0);
3773 dout(10) << " safe_splice write to " << to << " len " << r
3774 << " got " << r << dendl;
3775 if (r < 0) {
3776 derr << __FUNC__ << ": write error at " << pos << "~"
3777 << r << ", " << cpp_strerror(r) << dendl;
3778 break;
3779 }
3780 }
3781 close(pipefd[0]);
3782 close(pipefd[1]);
3783 } else
3784 #endif
3785 {
3786 int64_t actual;
3787
3788 actual = ::lseek64(from, srcoff, SEEK_SET);
3789 if (actual != (int64_t)srcoff) {
3790 if (actual < 0)
3791 r = -errno;
3792 else
3793 r = -EINVAL;
3794 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
3795 return r;
3796 }
3797 actual = ::lseek64(to, dstoff, SEEK_SET);
3798 if (actual != (int64_t)dstoff) {
3799 if (actual < 0)
3800 r = -errno;
3801 else
3802 r = -EINVAL;
3803 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
3804 return r;
3805 }
3806
3807 char buf[buflen];
3808 while (pos < end) {
3809 int l = MIN(end-pos, buflen);
3810 r = ::read(from, buf, l);
3811 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
3812 if (r < 0) {
3813 if (errno == EINTR) {
3814 continue;
3815 } else {
3816 r = -errno;
3817 derr << __FUNC__ << ": read error at " << pos << "~" << len
3818 << ", " << cpp_strerror(r) << dendl;
3819 break;
3820 }
3821 }
3822 if (r == 0) {
3823 // hrm, bad source range, wtf.
3824 r = -ERANGE;
3825 derr << __FUNC__ << ": got short read result at " << pos
3826 << " of fd " << from << " len " << len << dendl;
3827 break;
3828 }
3829 int op = 0;
3830 while (op < r) {
3831 int r2 = safe_write(to, buf+op, r-op);
3832 dout(25) << " write to " << to << " len " << (r-op)
3833 << " got " << r2 << dendl;
3834 if (r2 < 0) {
3835 r = r2;
3836 derr << __FUNC__ << ": write error at " << pos << "~"
3837 << r-op << ", " << cpp_strerror(r) << dendl;
3838
3839 break;
3840 }
3841 op += (r-op);
3842 }
3843 if (r < 0)
3844 break;
3845 pos += r;
3846 }
3847 }
3848
3849 if (r < 0 && replaying) {
3850 assert(r == -ERANGE);
3851 derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
3852 r = pos - from;;
3853 }
3854 assert(replaying || pos == end);
3855 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
3856 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3857 assert(rc >= 0);
3858 }
3859 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3860 return r;
3861 }
3862
3863 int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
3864 uint64_t srcoff, uint64_t len, uint64_t dstoff,
3865 const SequencerPosition& spos)
3866 {
3867 dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
3868
3869 if (_check_replay_guard(newcid, newoid, spos) < 0)
3870 return 0;
3871
3872 int r;
3873 FDRef o, n;
3874 r = lfn_open(oldcid, oldoid, false, &o);
3875 if (r < 0) {
3876 goto out2;
3877 }
3878 r = lfn_open(newcid, newoid, true, &n);
3879 if (r < 0) {
3880 goto out;
3881 }
3882 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
3883 if (r < 0) {
3884 goto out3;
3885 }
3886
3887 // clone is non-idempotent; record our work.
3888 _set_replay_guard(**n, spos, &newoid);
3889
3890 out3:
3891 lfn_close(n);
3892 out:
3893 lfn_close(o);
3894 out2:
3895 dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
3896 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3897 return r;
3898 }
3899
3900 class SyncEntryTimeout : public Context {
3901 public:
3902 CephContext* cct;
3903 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
3904 : cct(cct), m_commit_timeo(commit_timeo)
3905 {
3906 }
3907
3908 void finish(int r) override {
3909 BackTrace *bt = new BackTrace(1);
3910 generic_dout(-1) << "FileStore: sync_entry timed out after "
3911 << m_commit_timeo << " seconds.\n";
3912 bt->print(*_dout);
3913 *_dout << dendl;
3914 delete bt;
3915 ceph_abort();
3916 }
3917 private:
3918 int m_commit_timeo;
3919 };
3920
3921 void FileStore::sync_entry()
3922 {
3923 lock.Lock();
3924 while (!stop) {
3925 utime_t max_interval;
3926 max_interval.set_from_double(m_filestore_max_sync_interval);
3927 utime_t min_interval;
3928 min_interval.set_from_double(m_filestore_min_sync_interval);
3929
3930 utime_t startwait = ceph_clock_now();
3931 if (!force_sync) {
3932 dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
3933 sync_cond.WaitInterval(lock, max_interval);
3934 } else {
3935 dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
3936 }
3937
3938 if (force_sync) {
3939 dout(20) << __FUNC__ << ": force_sync set" << dendl;
3940 force_sync = false;
3941 } else if (stop) {
3942 dout(20) << __FUNC__ << ": stop set" << dendl;
3943 break;
3944 } else {
3945 // wait for at least the min interval
3946 utime_t woke = ceph_clock_now();
3947 woke -= startwait;
3948 dout(20) << __FUNC__ << ": woke after " << woke << dendl;
3949 if (woke < min_interval) {
3950 utime_t t = min_interval;
3951 t -= woke;
3952 dout(20) << __FUNC__ << ": waiting for another " << t
3953 << " to reach min interval " << min_interval << dendl;
3954 sync_cond.WaitInterval(lock, t);
3955 }
3956 }
3957
3958 list<Context*> fin;
3959 again:
3960 fin.swap(sync_waiters);
3961 lock.Unlock();
3962
3963 op_tp.pause();
3964 if (apply_manager.commit_start()) {
3965 utime_t start = ceph_clock_now();
3966 uint64_t cp = apply_manager.get_committing_seq();
3967
3968 sync_entry_timeo_lock.Lock();
3969 SyncEntryTimeout *sync_entry_timeo =
3970 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
3971 timer.add_event_after(m_filestore_commit_timeout, sync_entry_timeo);
3972 sync_entry_timeo_lock.Unlock();
3973
3974 logger->set(l_filestore_committing, 1);
3975
3976 dout(15) << __FUNC__ << ": committing " << cp << dendl;
3977 stringstream errstream;
3978 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
3979 derr << errstream.str() << dendl;
3980 ceph_abort();
3981 }
3982
3983 if (backend->can_checkpoint()) {
3984 int err = write_op_seq(op_fd, cp);
3985 if (err < 0) {
3986 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
3987 assert(0 == "error during write_op_seq");
3988 }
3989
3990 char s[NAME_MAX];
3991 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
3992 uint64_t cid = 0;
3993 err = backend->create_checkpoint(s, &cid);
3994 if (err < 0) {
3995 int err = errno;
3996 derr << "snap create '" << s << "' got error " << err << dendl;
3997 assert(err == 0);
3998 }
3999
4000 snaps.push_back(cp);
4001 apply_manager.commit_started();
4002 op_tp.unpause();
4003
4004 if (cid > 0) {
4005 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4006 err = backend->sync_checkpoint(cid);
4007 if (err < 0) {
4008 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
4009 assert(0 == "wait_sync got error");
4010 }
4011 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4012 }
4013 } else
4014 {
4015 apply_manager.commit_started();
4016 op_tp.unpause();
4017
4018 int err = object_map->sync();
4019 if (err < 0) {
4020 derr << "object_map sync got " << cpp_strerror(err) << dendl;
4021 assert(0 == "object_map sync returned error");
4022 }
4023
4024 err = backend->syncfs();
4025 if (err < 0) {
4026 derr << "syncfs got " << cpp_strerror(err) << dendl;
4027 assert(0 == "syncfs returned error");
4028 }
4029
4030 err = write_op_seq(op_fd, cp);
4031 if (err < 0) {
4032 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4033 assert(0 == "error during write_op_seq");
4034 }
4035 err = ::fsync(op_fd);
4036 if (err < 0) {
4037 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
4038 assert(0 == "error during fsync of op_seq");
4039 }
4040 }
4041
4042 utime_t done = ceph_clock_now();
4043 utime_t lat = done - start;
4044 utime_t dur = done - startwait;
4045 dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
4046
4047 logger->inc(l_filestore_commitcycle);
4048 logger->tinc(l_filestore_commitcycle_latency, lat);
4049 logger->tinc(l_filestore_commitcycle_interval, dur);
4050
4051 apply_manager.commit_finish();
4052 if (!m_disable_wbthrottle) {
4053 wbthrottle.clear();
4054 }
4055
4056 logger->set(l_filestore_committing, 0);
4057
4058 // remove old snaps?
4059 if (backend->can_checkpoint()) {
4060 char s[NAME_MAX];
4061 while (snaps.size() > 2) {
4062 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4063 snaps.pop_front();
4064 dout(10) << "removing snap '" << s << "'" << dendl;
4065 int r = backend->destroy_checkpoint(s);
4066 if (r) {
4067 int err = errno;
4068 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4069 }
4070 }
4071 }
4072
4073 dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
4074
4075 sync_entry_timeo_lock.Lock();
4076 timer.cancel_event(sync_entry_timeo);
4077 sync_entry_timeo_lock.Unlock();
4078 } else {
4079 op_tp.unpause();
4080 }
4081
4082 lock.Lock();
4083 finish_contexts(cct, fin, 0);
4084 fin.clear();
4085 if (!sync_waiters.empty()) {
4086 dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
4087 goto again;
4088 }
4089 if (!stop && journal && journal->should_commit_now()) {
4090 dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
4091 goto again;
4092 }
4093 }
4094 stop = false;
4095 lock.Unlock();
4096 }
4097
4098 void FileStore::_start_sync()
4099 {
4100 if (!journal) { // don't do a big sync if the journal is on
4101 dout(10) << __FUNC__ << dendl;
4102 sync_cond.Signal();
4103 } else {
4104 dout(10) << __FUNC__ << ": - NOOP (journal is on)" << dendl;
4105 }
4106 }
4107
4108 void FileStore::do_force_sync()
4109 {
4110 dout(10) << __FUNC__ << dendl;
4111 Mutex::Locker l(lock);
4112 force_sync = true;
4113 sync_cond.Signal();
4114 }
4115
4116 void FileStore::start_sync(Context *onsafe)
4117 {
4118 Mutex::Locker l(lock);
4119 sync_waiters.push_back(onsafe);
4120 sync_cond.Signal();
4121 force_sync = true;
4122 dout(10) << __FUNC__ << dendl;
4123 }
4124
4125 void FileStore::sync()
4126 {
4127 Mutex l("FileStore::sync");
4128 Cond c;
4129 bool done;
4130 C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
4131
4132 start_sync(fin);
4133
4134 l.Lock();
4135 while (!done) {
4136 dout(10) << "sync waiting" << dendl;
4137 c.Wait(l);
4138 }
4139 l.Unlock();
4140 dout(10) << "sync done" << dendl;
4141 }
4142
4143 void FileStore::_flush_op_queue()
4144 {
4145 dout(10) << __FUNC__ << ": draining op tp" << dendl;
4146 op_wq.drain();
4147 dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
4148 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4149 (*it)->wait_for_empty();
4150 }
4151 }
4152
4153 /*
4154 * flush - make every queued write readable
4155 */
4156 void FileStore::flush()
4157 {
4158 dout(10) << __FUNC__ << dendl;
4159
4160 if (cct->_conf->filestore_blackhole) {
4161 // wait forever
4162 Mutex lock("FileStore::flush::lock");
4163 Cond cond;
4164 lock.Lock();
4165 while (true)
4166 cond.Wait(lock);
4167 ceph_abort();
4168 }
4169
4170 if (m_filestore_journal_writeahead) {
4171 if (journal)
4172 journal->flush();
4173 dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
4174 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4175 (*it)->wait_for_empty();
4176 }
4177 }
4178
4179 _flush_op_queue();
4180 dout(10) << __FUNC__ << ": complete" << dendl;
4181 }
4182
4183 /*
4184 * sync_and_flush - make every queued write readable AND committed to disk
4185 */
4186 void FileStore::sync_and_flush()
4187 {
4188 dout(10) << __FUNC__ << dendl;
4189
4190 if (m_filestore_journal_writeahead) {
4191 if (journal)
4192 journal->flush();
4193 _flush_op_queue();
4194 } else {
4195 // includes m_filestore_journal_parallel
4196 _flush_op_queue();
4197 sync();
4198 }
4199 dout(10) << __FUNC__ << ": done" << dendl;
4200 }
4201
4202 int FileStore::flush_journal()
4203 {
4204 dout(10) << __FUNC__ << dendl;
4205 sync_and_flush();
4206 sync();
4207 return 0;
4208 }
4209
4210 int FileStore::snapshot(const string& name)
4211 {
4212 dout(10) << __FUNC__ << ": " << name << dendl;
4213 sync_and_flush();
4214
4215 if (!backend->can_checkpoint()) {
4216 dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
4217 return -EOPNOTSUPP;
4218 }
4219
4220 char s[NAME_MAX];
4221 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4222
4223 int r = backend->create_checkpoint(s, NULL);
4224 if (r) {
4225 derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
4226 }
4227
4228 return r;
4229 }
4230
4231 // -------------------------------
4232 // attributes
4233
4234 int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4235 {
4236 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4237 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4238 if (l >= 0) {
4239 bp = buffer::create(l);
4240 memcpy(bp.c_str(), val, l);
4241 } else if (l == -ERANGE) {
4242 l = chain_fgetxattr(fd, name, 0, 0);
4243 if (l > 0) {
4244 bp = buffer::create(l);
4245 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4246 }
4247 }
4248 assert(!m_filestore_fail_eio || l != -EIO);
4249 return l;
4250 }
4251
4252 int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4253 {
4254 // get attr list
4255 char names1[100];
4256 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4257 char *names2 = 0;
4258 char *name = 0;
4259 if (len == -ERANGE) {
4260 len = chain_flistxattr(fd, 0, 0);
4261 if (len < 0) {
4262 assert(!m_filestore_fail_eio || len != -EIO);
4263 return len;
4264 }
4265 dout(10) << " -ERANGE, len is " << len << dendl;
4266 names2 = new char[len+1];
4267 len = chain_flistxattr(fd, names2, len);
4268 dout(10) << " -ERANGE, got " << len << dendl;
4269 if (len < 0) {
4270 assert(!m_filestore_fail_eio || len != -EIO);
4271 delete[] names2;
4272 return len;
4273 }
4274 name = names2;
4275 } else if (len < 0) {
4276 assert(!m_filestore_fail_eio || len != -EIO);
4277 return len;
4278 } else {
4279 name = names1;
4280 }
4281 name[len] = 0;
4282
4283 char *end = name + len;
4284 while (name < end) {
4285 char *attrname = name;
4286 if (parse_attrname(&name)) {
4287 if (*name) {
4288 dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
4289 int r = _fgetattr(fd, attrname, aset[name]);
4290 if (r < 0) {
4291 delete[] names2;
4292 return r;
4293 }
4294 }
4295 }
4296 name += strlen(name) + 1;
4297 }
4298
4299 delete[] names2;
4300 return 0;
4301 }
4302
4303 int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4304 {
4305 for (map<string, bufferptr>::iterator p = aset.begin();
4306 p != aset.end();
4307 ++p) {
4308 char n[CHAIN_XATTR_MAX_NAME_LEN];
4309 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4310 const char *val;
4311 if (p->second.length())
4312 val = p->second.c_str();
4313 else
4314 val = "";
4315 // ??? Why do we skip setting all the other attrs if one fails?
4316 int r = chain_fsetxattr(fd, n, val, p->second.length());
4317 if (r < 0) {
4318 derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
4319 return r;
4320 }
4321 }
4322 return 0;
4323 }
4324
4325 // debug EIO injection
4326 void FileStore::inject_data_error(const ghobject_t &oid) {
4327 Mutex::Locker l(read_error_lock);
4328 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4329 data_error_set.insert(oid);
4330 }
4331 void FileStore::inject_mdata_error(const ghobject_t &oid) {
4332 Mutex::Locker l(read_error_lock);
4333 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4334 mdata_error_set.insert(oid);
4335 }
4336 void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4337 Mutex::Locker l(read_error_lock);
4338 dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
4339 data_error_set.erase(oid);
4340 mdata_error_set.erase(oid);
4341 }
4342 bool FileStore::debug_data_eio(const ghobject_t &oid) {
4343 Mutex::Locker l(read_error_lock);
4344 if (data_error_set.count(oid)) {
4345 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4346 return true;
4347 } else {
4348 return false;
4349 }
4350 }
4351 bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4352 Mutex::Locker l(read_error_lock);
4353 if (mdata_error_set.count(oid)) {
4354 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4355 return true;
4356 } else {
4357 return false;
4358 }
4359 }
4360
4361
4362 // objects
4363
4364 int FileStore::getattr(const coll_t& _cid, const ghobject_t& oid, const char *name, bufferptr &bp)
4365 {
4366 tracepoint(objectstore, getattr_enter, _cid.c_str());
4367 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4368 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4369 FDRef fd;
4370 int r = lfn_open(cid, oid, false, &fd);
4371 if (r < 0) {
4372 goto out;
4373 }
4374 char n[CHAIN_XATTR_MAX_NAME_LEN];
4375 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4376 r = _fgetattr(**fd, n, bp);
4377 lfn_close(fd);
4378 if (r == -ENODATA) {
4379 map<string, bufferlist> got;
4380 set<string> to_get;
4381 to_get.insert(string(name));
4382 Index index;
4383 r = get_index(cid, &index);
4384 if (r < 0) {
4385 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4386 goto out;
4387 }
4388 r = object_map->get_xattrs(oid, to_get, &got);
4389 if (r < 0 && r != -ENOENT) {
4390 dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
4391 goto out;
4392 }
4393 if (got.empty()) {
4394 dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
4395 return -ENODATA;
4396 }
4397 bp = bufferptr(got.begin()->second.c_str(),
4398 got.begin()->second.length());
4399 r = bp.length();
4400 }
4401 out:
4402 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4403 assert(!m_filestore_fail_eio || r != -EIO);
4404 if (cct->_conf->filestore_debug_inject_read_err &&
4405 debug_mdata_eio(oid)) {
4406 return -EIO;
4407 } else {
4408 tracepoint(objectstore, getattr_exit, r);
4409 return r < 0 ? r : 0;
4410 }
4411 }
4412
4413 int FileStore::getattrs(const coll_t& _cid, const ghobject_t& oid, map<string,bufferptr>& aset)
4414 {
4415 tracepoint(objectstore, getattrs_enter, _cid.c_str());
4416 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4417 set<string> omap_attrs;
4418 map<string, bufferlist> omap_aset;
4419 Index index;
4420 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4421 FDRef fd;
4422 bool spill_out = true;
4423 char buf[2];
4424
4425 int r = lfn_open(cid, oid, false, &fd);
4426 if (r < 0) {
4427 goto out;
4428 }
4429
4430 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4431 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4432 spill_out = false;
4433
4434 r = _fgetattrs(**fd, aset);
4435 lfn_close(fd);
4436 fd = FDRef(); // defensive
4437 if (r < 0) {
4438 goto out;
4439 }
4440
4441 if (!spill_out) {
4442 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4443 goto out;
4444 }
4445
4446 r = get_index(cid, &index);
4447 if (r < 0) {
4448 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4449 goto out;
4450 }
4451 {
4452 r = object_map->get_all_xattrs(oid, &omap_attrs);
4453 if (r < 0 && r != -ENOENT) {
4454 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4455 goto out;
4456 }
4457
4458 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4459 if (r < 0 && r != -ENOENT) {
4460 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4461 goto out;
4462 }
4463 if (r == -ENOENT)
4464 r = 0;
4465 }
4466 assert(omap_attrs.size() == omap_aset.size());
4467 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4468 i != omap_aset.end();
4469 ++i) {
4470 string key(i->first);
4471 aset.insert(make_pair(key,
4472 bufferptr(i->second.c_str(), i->second.length())));
4473 }
4474 out:
4475 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4476 assert(!m_filestore_fail_eio || r != -EIO);
4477
4478 if (cct->_conf->filestore_debug_inject_read_err &&
4479 debug_mdata_eio(oid)) {
4480 return -EIO;
4481 } else {
4482 tracepoint(objectstore, getattrs_exit, r);
4483 return r;
4484 }
4485 }
4486
4487 int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4488 const SequencerPosition &spos)
4489 {
4490 map<string, bufferlist> omap_set;
4491 set<string> omap_remove;
4492 map<string, bufferptr> inline_set;
4493 map<string, bufferptr> inline_to_set;
4494 FDRef fd;
4495 int spill_out = -1;
4496 bool incomplete_inline = false;
4497
4498 int r = lfn_open(cid, oid, false, &fd);
4499 if (r < 0) {
4500 goto out;
4501 }
4502
4503 char buf[2];
4504 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4505 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4506 spill_out = 0;
4507 else
4508 spill_out = 1;
4509
4510 r = _fgetattrs(**fd, inline_set);
4511 incomplete_inline = (r == -E2BIG);
4512 assert(!m_filestore_fail_eio || r != -EIO);
4513 dout(15) << __FUNC__ << ": " << cid << "/" << oid
4514 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4515 << dendl;
4516
4517 for (map<string,bufferptr>::iterator p = aset.begin();
4518 p != aset.end();
4519 ++p) {
4520 char n[CHAIN_XATTR_MAX_NAME_LEN];
4521 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4522
4523 if (incomplete_inline) {
4524 chain_fremovexattr(**fd, n); // ignore any error
4525 omap_set[p->first].push_back(p->second);
4526 continue;
4527 }
4528
4529 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4530 if (inline_set.count(p->first)) {
4531 inline_set.erase(p->first);
4532 r = chain_fremovexattr(**fd, n);
4533 if (r < 0)
4534 goto out_close;
4535 }
4536 omap_set[p->first].push_back(p->second);
4537 continue;
4538 }
4539
4540 if (!inline_set.count(p->first) &&
4541 inline_set.size() >= m_filestore_max_inline_xattrs) {
4542 omap_set[p->first].push_back(p->second);
4543 continue;
4544 }
4545 omap_remove.insert(p->first);
4546 inline_set.insert(*p);
4547
4548 inline_to_set.insert(*p);
4549 }
4550
4551 if (spill_out != 1 && !omap_set.empty()) {
4552 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4553 sizeof(XATTR_SPILL_OUT));
4554 }
4555
4556 r = _fsetattrs(**fd, inline_to_set);
4557 if (r < 0)
4558 goto out_close;
4559
4560 if (spill_out && !omap_remove.empty()) {
4561 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4562 if (r < 0 && r != -ENOENT) {
4563 dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
4564 assert(!m_filestore_fail_eio || r != -EIO);
4565 goto out_close;
4566 } else {
4567 r = 0; // don't confuse the debug output
4568 }
4569 }
4570
4571 if (!omap_set.empty()) {
4572 r = object_map->set_xattrs(oid, omap_set, &spos);
4573 if (r < 0) {
4574 dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
4575 assert(!m_filestore_fail_eio || r != -EIO);
4576 goto out_close;
4577 }
4578 }
4579 out_close:
4580 lfn_close(fd);
4581 out:
4582 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4583 return r;
4584 }
4585
4586
4587 int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4588 const SequencerPosition &spos)
4589 {
4590 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4591 FDRef fd;
4592 bool spill_out = true;
4593
4594 int r = lfn_open(cid, oid, false, &fd);
4595 if (r < 0) {
4596 goto out;
4597 }
4598
4599 char buf[2];
4600 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4601 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4602 spill_out = false;
4603 }
4604
4605 char n[CHAIN_XATTR_MAX_NAME_LEN];
4606 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4607 r = chain_fremovexattr(**fd, n);
4608 if (r == -ENODATA && spill_out) {
4609 Index index;
4610 r = get_index(cid, &index);
4611 if (r < 0) {
4612 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4613 goto out_close;
4614 }
4615 set<string> to_remove;
4616 to_remove.insert(string(name));
4617 r = object_map->remove_xattrs(oid, to_remove, &spos);
4618 if (r < 0 && r != -ENOENT) {
4619 dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
4620 assert(!m_filestore_fail_eio || r != -EIO);
4621 goto out_close;
4622 }
4623 }
4624 out_close:
4625 lfn_close(fd);
4626 out:
4627 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4628 return r;
4629 }
4630
4631 int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4632 const SequencerPosition &spos)
4633 {
4634 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4635
4636 map<string,bufferptr> aset;
4637 FDRef fd;
4638 set<string> omap_attrs;
4639 Index index;
4640 bool spill_out = true;
4641
4642 int r = lfn_open(cid, oid, false, &fd);
4643 if (r < 0) {
4644 goto out;
4645 }
4646
4647 char buf[2];
4648 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4649 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4650 spill_out = false;
4651 }
4652
4653 r = _fgetattrs(**fd, aset);
4654 if (r >= 0) {
4655 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4656 char n[CHAIN_XATTR_MAX_NAME_LEN];
4657 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4658 r = chain_fremovexattr(**fd, n);
4659 if (r < 0) {
4660 dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
4661 goto out_close;
4662 }
4663 }
4664 }
4665
4666 if (!spill_out) {
4667 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4668 goto out_close;
4669 }
4670
4671 r = get_index(cid, &index);
4672 if (r < 0) {
4673 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4674 goto out_close;
4675 }
4676 {
4677 r = object_map->get_all_xattrs(oid, &omap_attrs);
4678 if (r < 0 && r != -ENOENT) {
4679 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4680 assert(!m_filestore_fail_eio || r != -EIO);
4681 goto out_close;
4682 }
4683 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4684 if (r < 0 && r != -ENOENT) {
4685 dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
4686 goto out_close;
4687 }
4688 if (r == -ENOENT)
4689 r = 0;
4690 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4691 sizeof(XATTR_NO_SPILL_OUT));
4692 }
4693
4694 out_close:
4695 lfn_close(fd);
4696 out:
4697 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4698 return r;
4699 }
4700
4701
4702
4703
4704 int FileStore::_collection_remove_recursive(const coll_t &cid,
4705 const SequencerPosition &spos)
4706 {
4707 struct stat st;
4708 int r = collection_stat(cid, &st);
4709 if (r < 0) {
4710 if (r == -ENOENT)
4711 return 0;
4712 return r;
4713 }
4714
4715 vector<ghobject_t> objects;
4716 ghobject_t max;
4717 while (!max.is_max()) {
4718 r = collection_list(cid, max, ghobject_t::get_max(),
4719 300, &objects, &max);
4720 if (r < 0)
4721 return r;
4722 for (vector<ghobject_t>::iterator i = objects.begin();
4723 i != objects.end();
4724 ++i) {
4725 assert(_check_replay_guard(cid, *i, spos));
4726 r = _remove(cid, *i, spos);
4727 if (r < 0)
4728 return r;
4729 }
4730 objects.clear();
4731 }
4732 return _destroy_collection(cid);
4733 }
4734
4735 // --------------------------
4736 // collections
4737
4738 int FileStore::list_collections(vector<coll_t>& ls)
4739 {
4740 return list_collections(ls, false);
4741 }
4742
4743 int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4744 {
4745 tracepoint(objectstore, list_collections_enter);
4746 dout(10) << __FUNC__ << dendl;
4747
4748 char fn[PATH_MAX];
4749 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4750
4751 int r = 0;
4752 DIR *dir = ::opendir(fn);
4753 if (!dir) {
4754 r = -errno;
4755 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
4756 assert(!m_filestore_fail_eio || r != -EIO);
4757 return r;
4758 }
4759
4760 struct dirent *de = nullptr;
4761 while ((de = ::readdir(dir))) {
4762 if (de->d_type == DT_UNKNOWN) {
4763 // d_type not supported (non-ext[234], btrfs), must stat
4764 struct stat sb;
4765 char filename[PATH_MAX];
4766 snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4767
4768 r = ::stat(filename, &sb);
4769 if (r < 0) {
4770 r = -errno;
4771 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
4772 assert(!m_filestore_fail_eio || r != -EIO);
4773 break;
4774 }
4775 if (!S_ISDIR(sb.st_mode)) {
4776 continue;
4777 }
4778 } else if (de->d_type != DT_DIR) {
4779 continue;
4780 }
4781 if (strcmp(de->d_name, "omap") == 0) {
4782 continue;
4783 }
4784 if (de->d_name[0] == '.' &&
4785 (de->d_name[1] == '\0' ||
4786 (de->d_name[1] == '.' &&
4787 de->d_name[2] == '\0')))
4788 continue;
4789 coll_t cid;
4790 if (!cid.parse(de->d_name)) {
4791 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
4792 continue;
4793 }
4794 if (!cid.is_temp() || include_temp)
4795 ls.push_back(cid);
4796 }
4797
4798 if (r > 0) {
4799 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
4800 r = -r;
4801 }
4802
4803 ::closedir(dir);
4804 assert(!m_filestore_fail_eio || r != -EIO);
4805 tracepoint(objectstore, list_collections_exit, r);
4806 return r;
4807 }
4808
4809 int FileStore::collection_stat(const coll_t& c, struct stat *st)
4810 {
4811 tracepoint(objectstore, collection_stat_enter, c.c_str());
4812 char fn[PATH_MAX];
4813 get_cdir(c, fn, sizeof(fn));
4814 dout(15) << __FUNC__ << ": " << fn << dendl;
4815 int r = ::stat(fn, st);
4816 if (r < 0)
4817 r = -errno;
4818 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
4819 assert(!m_filestore_fail_eio || r != -EIO);
4820 tracepoint(objectstore, collection_stat_exit, r);
4821 return r;
4822 }
4823
4824 bool FileStore::collection_exists(const coll_t& c)
4825 {
4826 tracepoint(objectstore, collection_exists_enter, c.c_str());
4827 struct stat st;
4828 bool ret = collection_stat(c, &st) == 0;
4829 tracepoint(objectstore, collection_exists_exit, ret);
4830 return ret;
4831 }
4832
4833 int FileStore::collection_empty(const coll_t& c, bool *empty)
4834 {
4835 tracepoint(objectstore, collection_empty_enter, c.c_str());
4836 dout(15) << __FUNC__ << ": " << c << dendl;
4837 Index index;
4838 int r = get_index(c, &index);
4839 if (r < 0) {
4840 derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
4841 << dendl;
4842 return r;
4843 }
4844
4845 assert(NULL != index.index);
4846 RWLock::RLocker l((index.index)->access_lock);
4847
4848 vector<ghobject_t> ls;
4849 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4850 1, &ls, NULL);
4851 if (r < 0) {
4852 derr << __FUNC__ << ": collection_list_partial returned: "
4853 << cpp_strerror(r) << dendl;
4854 assert(!m_filestore_fail_eio || r != -EIO);
4855 return r;
4856 }
4857 *empty = ls.empty();
4858 tracepoint(objectstore, collection_empty_exit, *empty);
4859 return 0;
4860 }
4861
4862 int FileStore::_collection_set_bits(const coll_t& c, int bits)
4863 {
4864 char fn[PATH_MAX];
4865 get_cdir(c, fn, sizeof(fn));
4866 dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
4867 char n[PATH_MAX];
4868 int r;
4869 int32_t v = bits;
4870 int fd = ::open(fn, O_RDONLY);
4871 if (fd < 0) {
4872 r = -errno;
4873 goto out;
4874 }
4875 get_attrname("bits", n, PATH_MAX);
4876 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
4877 VOID_TEMP_FAILURE_RETRY(::close(fd));
4878 out:
4879 dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
4880 return r;
4881 }
4882
4883 int FileStore::collection_bits(const coll_t& c)
4884 {
4885 char fn[PATH_MAX];
4886 get_cdir(c, fn, sizeof(fn));
4887 dout(15) << __FUNC__ << ": " << fn << dendl;
4888 int r;
4889 char n[PATH_MAX];
4890 int32_t bits;
4891 int fd = ::open(fn, O_RDONLY);
4892 if (fd < 0) {
4893 bits = r = -errno;
4894 goto out;
4895 }
4896 get_attrname("bits", n, PATH_MAX);
4897 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
4898 VOID_TEMP_FAILURE_RETRY(::close(fd));
4899 if (r < 0) {
4900 bits = r;
4901 goto out;
4902 }
4903 out:
4904 dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
4905 return bits;
4906 }
4907
4908 int FileStore::collection_list(const coll_t& c,
4909 const ghobject_t& orig_start,
4910 const ghobject_t& end,
4911 int max,
4912 vector<ghobject_t> *ls, ghobject_t *next)
4913 {
4914 ghobject_t start = orig_start;
4915 if (start.is_max())
4916 return 0;
4917
4918 ghobject_t temp_next;
4919 if (!next)
4920 next = &temp_next;
4921 // figure out the pool id. we need this in order to generate a
4922 // meaningful 'next' value.
4923 int64_t pool = -1;
4924 shard_id_t shard;
4925 {
4926 spg_t pgid;
4927 if (c.is_temp(&pgid)) {
4928 pool = -2 - pgid.pool();
4929 shard = pgid.shard;
4930 } else if (c.is_pg(&pgid)) {
4931 pool = pgid.pool();
4932 shard = pgid.shard;
4933 } else if (c.is_meta()) {
4934 pool = -1;
4935 shard = shard_id_t::NO_SHARD;
4936 } else {
4937 // hrm, the caller is test code! we should get kill it off. for now,
4938 // tolerate it.
4939 pool = 0;
4940 shard = shard_id_t::NO_SHARD;
4941 }
4942 dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
4943 << " pgid " << pgid << dendl;
4944 }
4945 ghobject_t sep;
4946 sep.hobj.pool = -1;
4947 sep.set_shard(shard);
4948 if (!c.is_temp() && !c.is_meta()) {
4949 if (start < sep) {
4950 dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
4951 coll_t temp = c.get_temp();
4952 int r = collection_list(temp, start, end, max, ls, next);
4953 if (r < 0)
4954 return r;
4955 if (*next != ghobject_t::get_max())
4956 return r;
4957 start = sep;
4958 dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
4959 << start << dendl;
4960 } else {
4961 dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
4962 }
4963 }
4964
4965 Index index;
4966 int r = get_index(c, &index);
4967 if (r < 0)
4968 return r;
4969
4970 assert(NULL != index.index);
4971 RWLock::RLocker l((index.index)->access_lock);
4972
4973 r = index->collection_list_partial(start, end, max, ls, next);
4974
4975 if (r < 0) {
4976 assert(!m_filestore_fail_eio || r != -EIO);
4977 return r;
4978 }
4979 dout(20) << "objects: " << *ls << dendl;
4980
4981 // HashIndex doesn't know the pool when constructing a 'next' value
4982 if (next && !next->is_max()) {
4983 next->hobj.pool = pool;
4984 next->set_shard(shard);
4985 dout(20) << " next " << *next << dendl;
4986 }
4987
4988 return 0;
4989 }
4990
4991 int FileStore::omap_get(const coll_t& _c, const ghobject_t &hoid,
4992 bufferlist *header,
4993 map<string, bufferlist> *out)
4994 {
4995 tracepoint(objectstore, omap_get_enter, _c.c_str());
4996 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
4997 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
4998 Index index;
4999 int r = get_index(c, &index);
5000 if (r < 0)
5001 return r;
5002 {
5003 assert(NULL != index.index);
5004 RWLock::RLocker l((index.index)->access_lock);
5005 r = lfn_find(hoid, index);
5006 if (r < 0)
5007 return r;
5008 }
5009 r = object_map->get(hoid, header, out);
5010 if (r < 0 && r != -ENOENT) {
5011 assert(!m_filestore_fail_eio || r != -EIO);
5012 return r;
5013 }
5014 tracepoint(objectstore, omap_get_exit, 0);
5015 return 0;
5016 }
5017
5018 int FileStore::omap_get_header(
5019 const coll_t& _c,
5020 const ghobject_t &hoid,
5021 bufferlist *bl,
5022 bool allow_eio)
5023 {
5024 tracepoint(objectstore, omap_get_header_enter, _c.c_str());
5025 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5026 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5027 Index index;
5028 int r = get_index(c, &index);
5029 if (r < 0)
5030 return r;
5031 {
5032 assert(NULL != index.index);
5033 RWLock::RLocker l((index.index)->access_lock);
5034 r = lfn_find(hoid, index);
5035 if (r < 0)
5036 return r;
5037 }
5038 r = object_map->get_header(hoid, bl);
5039 if (r < 0 && r != -ENOENT) {
5040 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
5041 return r;
5042 }
5043 tracepoint(objectstore, omap_get_header_exit, 0);
5044 return 0;
5045 }
5046
5047 int FileStore::omap_get_keys(const coll_t& _c, const ghobject_t &hoid, set<string> *keys)
5048 {
5049 tracepoint(objectstore, omap_get_keys_enter, _c.c_str());
5050 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5051 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5052 Index index;
5053 int r = get_index(c, &index);
5054 if (r < 0)
5055 return r;
5056 {
5057 assert(NULL != index.index);
5058 RWLock::RLocker l((index.index)->access_lock);
5059 r = lfn_find(hoid, index);
5060 if (r < 0)
5061 return r;
5062 }
5063 r = object_map->get_keys(hoid, keys);
5064 if (r < 0 && r != -ENOENT) {
5065 assert(!m_filestore_fail_eio || r != -EIO);
5066 return r;
5067 }
5068 tracepoint(objectstore, omap_get_keys_exit, 0);
5069 return 0;
5070 }
5071
5072 int FileStore::omap_get_values(const coll_t& _c, const ghobject_t &hoid,
5073 const set<string> &keys,
5074 map<string, bufferlist> *out)
5075 {
5076 tracepoint(objectstore, omap_get_values_enter, _c.c_str());
5077 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5078 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5079 Index index;
5080 const char *where = "()";
5081 int r = get_index(c, &index);
5082 if (r < 0) {
5083 where = " (get_index)";
5084 goto out;
5085 }
5086 {
5087 assert(NULL != index.index);
5088 RWLock::RLocker l((index.index)->access_lock);
5089 r = lfn_find(hoid, index);
5090 if (r < 0) {
5091 where = " (lfn_find)";
5092 goto out;
5093 }
5094 }
5095 r = object_map->get_values(hoid, keys, out);
5096 if (r < 0 && r != -ENOENT) {
5097 assert(!m_filestore_fail_eio || r != -EIO);
5098 where = " (get_values)";
5099 goto out;
5100 }
5101 r = 0;
5102 out:
5103 tracepoint(objectstore, omap_get_values_exit, r);
5104 dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
5105 << where << dendl;
5106 return r;
5107 }
5108
5109 int FileStore::omap_check_keys(const coll_t& _c, const ghobject_t &hoid,
5110 const set<string> &keys,
5111 set<string> *out)
5112 {
5113 tracepoint(objectstore, omap_check_keys_enter, _c.c_str());
5114 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5115 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5116
5117 Index index;
5118 int r = get_index(c, &index);
5119 if (r < 0)
5120 return r;
5121 {
5122 assert(NULL != index.index);
5123 RWLock::RLocker l((index.index)->access_lock);
5124 r = lfn_find(hoid, index);
5125 if (r < 0)
5126 return r;
5127 }
5128 r = object_map->check_keys(hoid, keys, out);
5129 if (r < 0 && r != -ENOENT) {
5130 assert(!m_filestore_fail_eio || r != -EIO);
5131 return r;
5132 }
5133 tracepoint(objectstore, omap_check_keys_exit, 0);
5134 return 0;
5135 }
5136
5137 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5138 const ghobject_t &hoid)
5139 {
5140 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5141 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5142 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5143 Index index;
5144 int r = get_index(c, &index);
5145 if (r < 0) {
5146 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5147 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5148 return ObjectMap::ObjectMapIterator();
5149 }
5150 {
5151 assert(NULL != index.index);
5152 RWLock::RLocker l((index.index)->access_lock);
5153 r = lfn_find(hoid, index);
5154 if (r < 0) {
5155 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5156 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5157 return ObjectMap::ObjectMapIterator();
5158 }
5159 }
5160 return object_map->get_iterator(hoid);
5161 }
5162
5163 int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5164 uint64_t expected_num_objs,
5165 const SequencerPosition &spos)
5166 {
5167 dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
5168 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5169
5170 bool empty;
5171 int ret = collection_empty(c, &empty);
5172 if (ret < 0)
5173 return ret;
5174 if (!empty && !replaying) {
5175 dout(0) << "Failed to give an expected number of objects hint to collection : "
5176 << c << ", only empty collection can take such type of hint. " << dendl;
5177 return 0;
5178 }
5179
5180 Index index;
5181 ret = get_index(c, &index);
5182 if (ret < 0)
5183 return ret;
5184 // Pre-hash the collection
5185 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5186 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5187 if (ret < 0)
5188 return ret;
5189 _set_replay_guard(c, spos);
5190
5191 return 0;
5192 }
5193
5194 int FileStore::_create_collection(
5195 const coll_t& c,
5196 int bits,
5197 const SequencerPosition &spos)
5198 {
5199 char fn[PATH_MAX];
5200 get_cdir(c, fn, sizeof(fn));
5201 dout(15) << __FUNC__ << ": " << fn << dendl;
5202 int r = ::mkdir(fn, 0755);
5203 if (r < 0)
5204 r = -errno;
5205 if (r == -EEXIST && replaying)
5206 r = 0;
5207 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5208
5209 if (r < 0)
5210 return r;
5211 r = init_index(c);
5212 if (r < 0)
5213 return r;
5214 r = _collection_set_bits(c, bits);
5215 if (r < 0)
5216 return r;
5217 // create parallel temp collection, too
5218 if (!c.is_meta() && !c.is_temp()) {
5219 coll_t temp = c.get_temp();
5220 r = _create_collection(temp, 0, spos);
5221 if (r < 0)
5222 return r;
5223 }
5224
5225 _set_replay_guard(c, spos);
5226 return 0;
5227 }
5228
5229 int FileStore::_destroy_collection(const coll_t& c)
5230 {
5231 int r = 0;
5232 char fn[PATH_MAX];
5233 get_cdir(c, fn, sizeof(fn));
5234 dout(15) << __FUNC__ << ": " << fn << dendl;
5235 {
5236 Index from;
5237 r = get_index(c, &from);
5238 if (r < 0)
5239 goto out;
5240 assert(NULL != from.index);
5241 RWLock::WLocker l((from.index)->access_lock);
5242
5243 r = from->prep_delete();
5244 if (r < 0)
5245 goto out;
5246 }
5247 r = ::rmdir(fn);
5248 if (r < 0) {
5249 r = -errno;
5250 goto out;
5251 }
5252
5253 out:
5254 // destroy parallel temp collection, too
5255 if (!c.is_meta() && !c.is_temp()) {
5256 coll_t temp = c.get_temp();
5257 int r2 = _destroy_collection(temp);
5258 if (r2 < 0) {
5259 r = r2;
5260 goto out_final;
5261 }
5262 }
5263
5264 out_final:
5265 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5266 return r;
5267 }
5268
5269
5270 int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5271 const SequencerPosition& spos)
5272 {
5273 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
5274
5275 int dstcmp = _check_replay_guard(c, o, spos);
5276 if (dstcmp < 0)
5277 return 0;
5278
5279 // check the src name too; it might have a newer guard, and we don't
5280 // want to clobber it
5281 int srccmp = _check_replay_guard(oldcid, o, spos);
5282 if (srccmp < 0)
5283 return 0;
5284
5285 // open guard on object so we don't any previous operations on the
5286 // new name that will modify the source inode.
5287 FDRef fd;
5288 int r = lfn_open(oldcid, o, 0, &fd);
5289 if (r < 0) {
5290 // the source collection/object does not exist. If we are replaying, we
5291 // should be safe, so just return 0 and move on.
5292 assert(replaying);
5293 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5294 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5295 return 0;
5296 }
5297 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5298 _set_replay_guard(**fd, spos, &o, true);
5299 }
5300
5301 r = lfn_link(oldcid, c, o, o);
5302 if (replaying && !backend->can_checkpoint() &&
5303 r == -EEXIST) // crashed between link() and set_replay_guard()
5304 r = 0;
5305
5306 _inject_failure();
5307
5308 // close guard on object so we don't do this again
5309 if (r == 0) {
5310 _close_replay_guard(**fd, spos);
5311 }
5312 lfn_close(fd);
5313
5314 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
5315 return r;
5316 }
5317
5318 int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5319 coll_t c, const ghobject_t& o,
5320 const SequencerPosition& spos,
5321 bool allow_enoent)
5322 {
5323 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
5324 int r = 0;
5325 int dstcmp, srccmp;
5326
5327 if (replaying) {
5328 /* If the destination collection doesn't exist during replay,
5329 * we need to delete the src object and continue on
5330 */
5331 if (!collection_exists(c))
5332 goto out_rm_src;
5333 }
5334
5335 dstcmp = _check_replay_guard(c, o, spos);
5336 if (dstcmp < 0)
5337 goto out_rm_src;
5338
5339 // check the src name too; it might have a newer guard, and we don't
5340 // want to clobber it
5341 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5342 if (srccmp < 0)
5343 return 0;
5344
5345 {
5346 // open guard on object so we don't any previous operations on the
5347 // new name that will modify the source inode.
5348 FDRef fd;
5349 r = lfn_open(oldcid, oldoid, 0, &fd);
5350 if (r < 0) {
5351 // the source collection/object does not exist. If we are replaying, we
5352 // should be safe, so just return 0 and move on.
5353 if (replaying) {
5354 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5355 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5356 } else if (allow_enoent) {
5357 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5358 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5359 << dendl;
5360 } else {
5361 assert(0 == "ERROR: source must exist");
5362 }
5363
5364 if (!replaying) {
5365 return 0;
5366 }
5367 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5368 return 0;
5369 }
5370
5371 r = 0; // don't know if object_map was cloned
5372 } else {
5373 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5374 _set_replay_guard(**fd, spos, &o, true);
5375 }
5376
5377 r = lfn_link(oldcid, c, oldoid, o);
5378 if (replaying && !backend->can_checkpoint() &&
5379 r == -EEXIST) // crashed between link() and set_replay_guard()
5380 r = 0;
5381
5382 lfn_close(fd);
5383 fd = FDRef();
5384
5385 _inject_failure();
5386 }
5387
5388 if (r == 0) {
5389 // the name changed; link the omap content
5390 r = object_map->rename(oldoid, o, &spos);
5391 if (r == -ENOENT)
5392 r = 0;
5393 }
5394
5395 _inject_failure();
5396
5397 if (r == 0)
5398 r = lfn_unlink(oldcid, oldoid, spos, true);
5399
5400 if (r == 0)
5401 r = lfn_open(c, o, 0, &fd);
5402
5403 // close guard on object so we don't do this again
5404 if (r == 0) {
5405 _close_replay_guard(**fd, spos, &o);
5406 lfn_close(fd);
5407 }
5408 }
5409
5410 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5411 << " = " << r << dendl;
5412 return r;
5413
5414 out_rm_src:
5415 // remove source
5416 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5417 r = lfn_unlink(oldcid, oldoid, spos, true);
5418 }
5419
5420 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5421 << " = " << r << dendl;
5422 return r;
5423 }
5424
5425 void FileStore::_inject_failure()
5426 {
5427 if (m_filestore_kill_at) {
5428 int final = --m_filestore_kill_at;
5429 dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
5430 if (final == 0) {
5431 derr << __FUNC__ << ": KILLING" << dendl;
5432 cct->_log->flush();
5433 _exit(1);
5434 }
5435 }
5436 }
5437
5438 int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5439 const SequencerPosition &spos) {
5440 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5441 Index index;
5442 int r = get_index(cid, &index);
5443 if (r < 0)
5444 return r;
5445 {
5446 assert(NULL != index.index);
5447 RWLock::RLocker l((index.index)->access_lock);
5448 r = lfn_find(hoid, index);
5449 if (r < 0)
5450 return r;
5451 }
5452 r = object_map->clear_keys_header(hoid, &spos);
5453 if (r < 0 && r != -ENOENT)
5454 return r;
5455 return 0;
5456 }
5457
5458 int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5459 const map<string, bufferlist> &aset,
5460 const SequencerPosition &spos) {
5461 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5462 Index index;
5463 int r;
5464 //treat pgmeta as a logical object, skip to check exist
5465 if (hoid.is_pgmeta())
5466 goto skip;
5467
5468 r = get_index(cid, &index);
5469 if (r < 0) {
5470 dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
5471 return r;
5472 }
5473 {
5474 assert(NULL != index.index);
5475 RWLock::RLocker l((index.index)->access_lock);
5476 r = lfn_find(hoid, index);
5477 if (r < 0) {
5478 dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
5479 return r;
5480 }
5481 }
5482 skip:
5483 if (g_conf->subsys.should_gather(ceph_subsys_filestore, 20)) {
5484 for (auto& p : aset) {
5485 dout(20) << __FUNC__ << ": set " << p.first << dendl;
5486 }
5487 }
5488 r = object_map->set_keys(hoid, aset, &spos);
5489 dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
5490 return r;
5491 }
5492
5493 int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5494 const set<string> &keys,
5495 const SequencerPosition &spos) {
5496 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5497 Index index;
5498 int r;
5499 //treat pgmeta as a logical object, skip to check exist
5500 if (hoid.is_pgmeta())
5501 goto skip;
5502
5503 r = get_index(cid, &index);
5504 if (r < 0)
5505 return r;
5506 {
5507 assert(NULL != index.index);
5508 RWLock::RLocker l((index.index)->access_lock);
5509 r = lfn_find(hoid, index);
5510 if (r < 0)
5511 return r;
5512 }
5513 skip:
5514 r = object_map->rm_keys(hoid, keys, &spos);
5515 if (r < 0 && r != -ENOENT)
5516 return r;
5517 return 0;
5518 }
5519
5520 int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5521 const string& first, const string& last,
5522 const SequencerPosition &spos) {
5523 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
5524 set<string> keys;
5525 {
5526 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5527 if (!iter)
5528 return -ENOENT;
5529 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5530 iter->next()) {
5531 keys.insert(iter->key());
5532 }
5533 }
5534 return _omap_rmkeys(cid, hoid, keys, spos);
5535 }
5536
5537 int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5538 const bufferlist &bl,
5539 const SequencerPosition &spos)
5540 {
5541 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5542 Index index;
5543 int r = get_index(cid, &index);
5544 if (r < 0)
5545 return r;
5546 {
5547 assert(NULL != index.index);
5548 RWLock::RLocker l((index.index)->access_lock);
5549 r = lfn_find(hoid, index);
5550 if (r < 0)
5551 return r;
5552 }
5553 return object_map->set_header(hoid, bl, &spos);
5554 }
5555
5556 int FileStore::_split_collection(const coll_t& cid,
5557 uint32_t bits,
5558 uint32_t rem,
5559 coll_t dest,
5560 const SequencerPosition &spos)
5561 {
5562 int r;
5563 {
5564 dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
5565 if (!collection_exists(cid)) {
5566 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5567 assert(replaying);
5568 return 0;
5569 }
5570 if (!collection_exists(dest)) {
5571 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5572 assert(replaying);
5573 return 0;
5574 }
5575
5576 int dstcmp = _check_replay_guard(dest, spos);
5577 if (dstcmp < 0)
5578 return 0;
5579
5580 int srccmp = _check_replay_guard(cid, spos);
5581 if (srccmp < 0)
5582 return 0;
5583
5584 _set_global_replay_guard(cid, spos);
5585 _set_replay_guard(cid, spos, true);
5586 _set_replay_guard(dest, spos, true);
5587
5588 Index from;
5589 r = get_index(cid, &from);
5590
5591 Index to;
5592 if (!r)
5593 r = get_index(dest, &to);
5594
5595 if (!r) {
5596 assert(NULL != from.index);
5597 RWLock::WLocker l1((from.index)->access_lock);
5598
5599 assert(NULL != to.index);
5600 RWLock::WLocker l2((to.index)->access_lock);
5601
5602 r = from->split(rem, bits, to.index);
5603 }
5604
5605 _close_replay_guard(cid, spos);
5606 _close_replay_guard(dest, spos);
5607 }
5608 _collection_set_bits(cid, bits);
5609 if (!r && cct->_conf->filestore_debug_verify_split) {
5610 vector<ghobject_t> objects;
5611 ghobject_t next;
5612 while (1) {
5613 collection_list(
5614 cid,
5615 next, ghobject_t::get_max(),
5616 get_ideal_list_max(),
5617 &objects,
5618 &next);
5619 if (objects.empty())
5620 break;
5621 for (vector<ghobject_t>::iterator i = objects.begin();
5622 i != objects.end();
5623 ++i) {
5624 dout(20) << __FUNC__ << ": " << *i << " still in source "
5625 << cid << dendl;
5626 assert(!i->match(bits, rem));
5627 }
5628 objects.clear();
5629 }
5630 next = ghobject_t();
5631 while (1) {
5632 collection_list(
5633 dest,
5634 next, ghobject_t::get_max(),
5635 get_ideal_list_max(),
5636 &objects,
5637 &next);
5638 if (objects.empty())
5639 break;
5640 for (vector<ghobject_t>::iterator i = objects.begin();
5641 i != objects.end();
5642 ++i) {
5643 dout(20) << __FUNC__ << ": " << *i << " now in dest "
5644 << *i << dendl;
5645 assert(i->match(bits, rem));
5646 }
5647 objects.clear();
5648 }
5649 }
5650 return r;
5651 }
5652
5653 int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
5654 uint64_t expected_object_size,
5655 uint64_t expected_write_size)
5656 {
5657 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
5658
5659 FDRef fd;
5660 int ret = 0;
5661
5662 if (expected_object_size == 0 || expected_write_size == 0)
5663 goto out;
5664
5665 ret = lfn_open(cid, oid, false, &fd);
5666 if (ret < 0)
5667 goto out;
5668
5669 {
5670 // TODO: a more elaborate hint calculation
5671 uint64_t hint = MIN(expected_write_size, m_filestore_max_alloc_hint_size);
5672
5673 ret = backend->set_alloc_hint(**fd, hint);
5674 dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
5675 }
5676
5677 lfn_close(fd);
5678 out:
5679 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
5680 assert(!m_filestore_fail_eio || ret != -EIO);
5681 return ret;
5682 }
5683
5684 const char** FileStore::get_tracked_conf_keys() const
5685 {
5686 static const char* KEYS[] = {
5687 "filestore_max_inline_xattr_size",
5688 "filestore_max_inline_xattr_size_xfs",
5689 "filestore_max_inline_xattr_size_btrfs",
5690 "filestore_max_inline_xattr_size_other",
5691 "filestore_max_inline_xattrs",
5692 "filestore_max_inline_xattrs_xfs",
5693 "filestore_max_inline_xattrs_btrfs",
5694 "filestore_max_inline_xattrs_other",
5695 "filestore_max_xattr_value_size",
5696 "filestore_max_xattr_value_size_xfs",
5697 "filestore_max_xattr_value_size_btrfs",
5698 "filestore_max_xattr_value_size_other",
5699 "filestore_min_sync_interval",
5700 "filestore_max_sync_interval",
5701 "filestore_queue_max_ops",
5702 "filestore_queue_max_bytes",
5703 "filestore_expected_throughput_bytes",
5704 "filestore_expected_throughput_ops",
5705 "filestore_queue_low_threshhold",
5706 "filestore_queue_high_threshhold",
5707 "filestore_queue_high_delay_multiple",
5708 "filestore_queue_max_delay_multiple",
5709 "filestore_commit_timeout",
5710 "filestore_dump_file",
5711 "filestore_kill_at",
5712 "filestore_fail_eio",
5713 "filestore_fadvise",
5714 "filestore_sloppy_crc",
5715 "filestore_sloppy_crc_block_size",
5716 "filestore_max_alloc_hint_size",
5717 NULL
5718 };
5719 return KEYS;
5720 }
5721
5722 void FileStore::handle_conf_change(const struct md_config_t *conf,
5723 const std::set <std::string> &changed)
5724 {
5725 if (changed.count("filestore_max_inline_xattr_size") ||
5726 changed.count("filestore_max_inline_xattr_size_xfs") ||
5727 changed.count("filestore_max_inline_xattr_size_btrfs") ||
5728 changed.count("filestore_max_inline_xattr_size_other") ||
5729 changed.count("filestore_max_inline_xattrs") ||
5730 changed.count("filestore_max_inline_xattrs_xfs") ||
5731 changed.count("filestore_max_inline_xattrs_btrfs") ||
5732 changed.count("filestore_max_inline_xattrs_other") ||
5733 changed.count("filestore_max_xattr_value_size") ||
5734 changed.count("filestore_max_xattr_value_size_xfs") ||
5735 changed.count("filestore_max_xattr_value_size_btrfs") ||
5736 changed.count("filestore_max_xattr_value_size_other")) {
5737 if (backend) {
5738 Mutex::Locker l(lock);
5739 set_xattr_limits_via_conf();
5740 }
5741 }
5742
5743 if (changed.count("filestore_queue_max_bytes") ||
5744 changed.count("filestore_queue_max_ops") ||
5745 changed.count("filestore_expected_throughput_bytes") ||
5746 changed.count("filestore_expected_throughput_ops") ||
5747 changed.count("filestore_queue_low_threshhold") ||
5748 changed.count("filestore_queue_high_threshhold") ||
5749 changed.count("filestore_queue_high_delay_multiple") ||
5750 changed.count("filestore_queue_max_delay_multiple")) {
5751 Mutex::Locker l(lock);
5752 set_throttle_params();
5753 }
5754
5755 if (changed.count("filestore_min_sync_interval") ||
5756 changed.count("filestore_max_sync_interval") ||
5757 changed.count("filestore_kill_at") ||
5758 changed.count("filestore_fail_eio") ||
5759 changed.count("filestore_sloppy_crc") ||
5760 changed.count("filestore_sloppy_crc_block_size") ||
5761 changed.count("filestore_max_alloc_hint_size") ||
5762 changed.count("filestore_fadvise")) {
5763 Mutex::Locker l(lock);
5764 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
5765 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
5766 m_filestore_kill_at = conf->filestore_kill_at;
5767 m_filestore_fail_eio = conf->filestore_fail_eio;
5768 m_filestore_fadvise = conf->filestore_fadvise;
5769 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
5770 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
5771 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
5772 }
5773 if (changed.count("filestore_commit_timeout")) {
5774 Mutex::Locker l(sync_entry_timeo_lock);
5775 m_filestore_commit_timeout = conf->filestore_commit_timeout;
5776 }
5777 if (changed.count("filestore_dump_file")) {
5778 if (conf->filestore_dump_file.length() &&
5779 conf->filestore_dump_file != "-") {
5780 dump_start(conf->filestore_dump_file);
5781 } else {
5782 dump_stop();
5783 }
5784 }
5785 }
5786
5787 int FileStore::set_throttle_params()
5788 {
5789 stringstream ss;
5790 bool valid = throttle_bytes.set_params(
5791 cct->_conf->filestore_queue_low_threshhold,
5792 cct->_conf->filestore_queue_high_threshhold,
5793 cct->_conf->filestore_expected_throughput_bytes,
5794 cct->_conf->filestore_queue_high_delay_multiple,
5795 cct->_conf->filestore_queue_max_delay_multiple,
5796 cct->_conf->filestore_queue_max_bytes,
5797 &ss);
5798
5799 valid &= throttle_ops.set_params(
5800 cct->_conf->filestore_queue_low_threshhold,
5801 cct->_conf->filestore_queue_high_threshhold,
5802 cct->_conf->filestore_expected_throughput_ops,
5803 cct->_conf->filestore_queue_high_delay_multiple,
5804 cct->_conf->filestore_queue_max_delay_multiple,
5805 cct->_conf->filestore_queue_max_ops,
5806 &ss);
5807
5808 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
5809 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
5810
5811 if (!valid) {
5812 derr << "tried to set invalid params: "
5813 << ss.str()
5814 << dendl;
5815 }
5816 return valid ? 0 : -EINVAL;
5817 }
5818
5819 void FileStore::dump_start(const std::string& file)
5820 {
5821 dout(10) << __FUNC__ << ": " << file << dendl;
5822 if (m_filestore_do_dump) {
5823 dump_stop();
5824 }
5825 m_filestore_dump_fmt.reset();
5826 m_filestore_dump_fmt.open_array_section("dump");
5827 m_filestore_dump.open(file.c_str());
5828 m_filestore_do_dump = true;
5829 }
5830
5831 void FileStore::dump_stop()
5832 {
5833 dout(10) << __FUNC__ << dendl;
5834 m_filestore_do_dump = false;
5835 if (m_filestore_dump.is_open()) {
5836 m_filestore_dump_fmt.close_section();
5837 m_filestore_dump_fmt.flush(m_filestore_dump);
5838 m_filestore_dump.flush();
5839 m_filestore_dump.close();
5840 }
5841 }
5842
5843 void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
5844 {
5845 m_filestore_dump_fmt.open_array_section("transactions");
5846 unsigned trans_num = 0;
5847 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
5848 m_filestore_dump_fmt.open_object_section("transaction");
5849 m_filestore_dump_fmt.dump_string("osr", osr->get_name());
5850 m_filestore_dump_fmt.dump_unsigned("seq", seq);
5851 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
5852 (*i).dump(&m_filestore_dump_fmt);
5853 m_filestore_dump_fmt.close_section();
5854 }
5855 m_filestore_dump_fmt.close_section();
5856 m_filestore_dump_fmt.flush(m_filestore_dump);
5857 m_filestore_dump.flush();
5858 }
5859
5860 void FileStore::set_xattr_limits_via_conf()
5861 {
5862 uint32_t fs_xattr_size;
5863 uint32_t fs_xattrs;
5864 uint32_t fs_xattr_max_value_size;
5865
5866 switch (m_fs_type) {
5867 #if defined(__linux__)
5868 case XFS_SUPER_MAGIC:
5869 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
5870 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
5871 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
5872 break;
5873 case BTRFS_SUPER_MAGIC:
5874 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
5875 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
5876 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
5877 break;
5878 #endif
5879 default:
5880 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
5881 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
5882 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
5883 break;
5884 }
5885
5886 // Use override value if set
5887 if (cct->_conf->filestore_max_inline_xattr_size)
5888 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
5889 else
5890 m_filestore_max_inline_xattr_size = fs_xattr_size;
5891
5892 // Use override value if set
5893 if (cct->_conf->filestore_max_inline_xattrs)
5894 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
5895 else
5896 m_filestore_max_inline_xattrs = fs_xattrs;
5897
5898 // Use override value if set
5899 if (cct->_conf->filestore_max_xattr_value_size)
5900 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
5901 else
5902 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
5903
5904 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
5905 derr << "WARNING: max attr value size ("
5906 << m_filestore_max_xattr_value_size
5907 << ") is smaller than osd_max_object_name_len ("
5908 << cct->_conf->osd_max_object_name_len
5909 << "). Your backend filesystem appears to not support attrs large "
5910 << "enough to handle the configured max rados name size. You may get "
5911 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5912 << "behavior"
5913 << dendl;
5914 }
5915 }
5916
5917 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
5918 {
5919 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5920 return res;
5921 }
5922
5923 int FileStore::apply_layout_settings(const coll_t &cid)
5924 {
5925 dout(20) << __FUNC__ << ": " << cid << dendl;
5926 Index index;
5927 int r = get_index(cid, &index);
5928 if (r < 0) {
5929 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
5930 << dendl;
5931 return r;
5932 }
5933
5934 return index->apply_layout_settings();
5935 }
5936
5937
5938 // -- FSSuperblock --
5939
5940 void FSSuperblock::encode(bufferlist &bl) const
5941 {
5942 ENCODE_START(2, 1, bl);
5943 compat_features.encode(bl);
5944 ::encode(omap_backend, bl);
5945 ENCODE_FINISH(bl);
5946 }
5947
5948 void FSSuperblock::decode(bufferlist::iterator &bl)
5949 {
5950 DECODE_START(2, bl);
5951 compat_features.decode(bl);
5952 if (struct_v >= 2)
5953 ::decode(omap_backend, bl);
5954 else
5955 omap_backend = "leveldb";
5956 DECODE_FINISH(bl);
5957 }
5958
5959 void FSSuperblock::dump(Formatter *f) const
5960 {
5961 f->open_object_section("compat");
5962 compat_features.dump(f);
5963 f->dump_string("omap_backend", omap_backend);
5964 f->close_section();
5965 }
5966
5967 void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
5968 {
5969 FSSuperblock z;
5970 o.push_back(new FSSuperblock(z));
5971 CompatSet::FeatureSet feature_compat;
5972 CompatSet::FeatureSet feature_ro_compat;
5973 CompatSet::FeatureSet feature_incompat;
5974 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
5975 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
5976 feature_incompat);
5977 o.push_back(new FSSuperblock(z));
5978 z.omap_backend = "rocksdb";
5979 o.push_back(new FSSuperblock(z));
5980 }