]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/io_posix.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rocksdb / env / io_posix.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifdef ROCKSDB_LIB_IO_POSIX
11 #include "env/io_posix.h"
12
13 #include <errno.h>
14 #include <fcntl.h>
15
16 #include <algorithm>
17 #if defined(OS_LINUX)
18 #include <linux/fs.h>
19 #ifndef FALLOC_FL_KEEP_SIZE
20 #include <linux/falloc.h>
21 #endif
22 #endif
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/ioctl.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #include <sys/types.h>
30 #ifdef OS_LINUX
31 #include <sys/statfs.h>
32 #include <sys/sysmacros.h>
33 #endif
34 #include "monitoring/iostats_context_imp.h"
35 #include "port/port.h"
36 #include "port/stack_trace.h"
37 #include "rocksdb/slice.h"
38 #include "test_util/sync_point.h"
39 #include "util/autovector.h"
40 #include "util/coding.h"
41 #include "util/string_util.h"
42
43 #if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
44 #define F_LINUX_SPECIFIC_BASE 1024
45 #define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
46 #endif
47
48 namespace ROCKSDB_NAMESPACE {
49
50 std::string IOErrorMsg(const std::string& context,
51 const std::string& file_name) {
52 if (file_name.empty()) {
53 return context;
54 }
55 return context + ": " + file_name;
56 }
57
58 // file_name can be left empty if it is not unkown.
59 IOStatus IOError(const std::string& context, const std::string& file_name,
60 int err_number) {
61 switch (err_number) {
62 case ENOSPC: {
63 IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name),
64 errnoStr(err_number).c_str());
65 s.SetRetryable(true);
66 return s;
67 }
68 case ESTALE:
69 return IOStatus::IOError(IOStatus::kStaleFile);
70 case ENOENT:
71 return IOStatus::PathNotFound(IOErrorMsg(context, file_name),
72 errnoStr(err_number).c_str());
73 default:
74 return IOStatus::IOError(IOErrorMsg(context, file_name),
75 errnoStr(err_number).c_str());
76 }
77 }
78
79 // A wrapper for fadvise, if the platform doesn't support fadvise,
80 // it will simply return 0.
81 int Fadvise(int fd, off_t offset, size_t len, int advice) {
82 #ifdef OS_LINUX
83 return posix_fadvise(fd, offset, len, advice);
84 #else
85 (void)fd;
86 (void)offset;
87 (void)len;
88 (void)advice;
89 return 0; // simply do nothing.
90 #endif
91 }
92
93 // A wrapper for fadvise, if the platform doesn't support fadvise,
94 // it will simply return 0.
95 int Madvise(void* addr, size_t len, int advice) {
96 #ifdef OS_LINUX
97 return posix_madvise(addr, len, advice);
98 #else
99 (void)addr;
100 (void)len;
101 (void)advice;
102 return 0; // simply do nothing.
103 #endif
104 }
105
106 namespace {
107
108 // On MacOS (and probably *BSD), the posix write and pwrite calls do not support
109 // buffers larger than 2^31-1 bytes. These two wrappers fix this issue by
110 // cutting the buffer in 1GB chunks. We use this chunk size to be sure to keep
111 // the writes aligned.
112
113 bool PosixWrite(int fd, const char* buf, size_t nbyte) {
114 const size_t kLimit1Gb = 1UL << 30;
115
116 const char* src = buf;
117 size_t left = nbyte;
118
119 while (left != 0) {
120 size_t bytes_to_write = std::min(left, kLimit1Gb);
121
122 ssize_t done = write(fd, src, bytes_to_write);
123 if (done < 0) {
124 if (errno == EINTR) {
125 continue;
126 }
127 return false;
128 }
129 left -= done;
130 src += done;
131 }
132 return true;
133 }
134
135 bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) {
136 const size_t kLimit1Gb = 1UL << 30;
137
138 const char* src = buf;
139 size_t left = nbyte;
140
141 while (left != 0) {
142 size_t bytes_to_write = std::min(left, kLimit1Gb);
143
144 ssize_t done = pwrite(fd, src, bytes_to_write, offset);
145 if (done < 0) {
146 if (errno == EINTR) {
147 continue;
148 }
149 return false;
150 }
151 left -= done;
152 offset += done;
153 src += done;
154 }
155
156 return true;
157 }
158
159 #ifdef ROCKSDB_RANGESYNC_PRESENT
160
161 #if !defined(ZFS_SUPER_MAGIC)
162 // The magic number for ZFS was not exposed until recently. It should be fixed
163 // forever so we can just copy the magic number here.
164 #define ZFS_SUPER_MAGIC 0x2fc12fc1
165 #endif
166
167 bool IsSyncFileRangeSupported(int fd) {
168 // This function tracks and checks for cases where we know `sync_file_range`
169 // definitely will not work properly despite passing the compile-time check
170 // (`ROCKSDB_RANGESYNC_PRESENT`). If we are unsure, or if any of the checks
171 // fail in unexpected ways, we allow `sync_file_range` to be used. This way
172 // should minimize risk of impacting existing use cases.
173 struct statfs buf;
174 int ret = fstatfs(fd, &buf);
175 assert(ret == 0);
176 if (ret == 0 && buf.f_type == ZFS_SUPER_MAGIC) {
177 // Testing on ZFS showed the writeback did not happen asynchronously when
178 // `sync_file_range` was called, even though it returned success. Avoid it
179 // and use `fdatasync` instead to preserve the contract of `bytes_per_sync`,
180 // even though this'll incur extra I/O for metadata.
181 return false;
182 }
183
184 ret = sync_file_range(fd, 0 /* offset */, 0 /* nbytes */, 0 /* flags */);
185 assert(!(ret == -1 && errno != ENOSYS));
186 if (ret == -1 && errno == ENOSYS) {
187 // `sync_file_range` is not implemented on all platforms even if
188 // compile-time checks pass and a supported filesystem is in-use. For
189 // example, using ext4 on WSL (Windows Subsystem for Linux),
190 // `sync_file_range()` returns `ENOSYS`
191 // ("Function not implemented").
192 return false;
193 }
194 // None of the known cases matched, so allow `sync_file_range` use.
195 return true;
196 }
197
198 #undef ZFS_SUPER_MAGIC
199
200 #endif // ROCKSDB_RANGESYNC_PRESENT
201
202 } // anonymous namespace
203
204 /*
205 * PosixSequentialFile
206 */
207 PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
208 int fd, size_t logical_block_size,
209 const EnvOptions& options)
210 : filename_(fname),
211 file_(file),
212 fd_(fd),
213 use_direct_io_(options.use_direct_reads),
214 logical_sector_size_(logical_block_size) {
215 assert(!options.use_direct_reads || !options.use_mmap_reads);
216 }
217
218 PosixSequentialFile::~PosixSequentialFile() {
219 if (!use_direct_io()) {
220 assert(file_);
221 fclose(file_);
222 } else {
223 assert(fd_);
224 close(fd_);
225 }
226 }
227
228 IOStatus PosixSequentialFile::Read(size_t n, const IOOptions& /*opts*/,
229 Slice* result, char* scratch,
230 IODebugContext* /*dbg*/) {
231 assert(result != nullptr && !use_direct_io());
232 IOStatus s;
233 size_t r = 0;
234 do {
235 clearerr(file_);
236 r = fread_unlocked(scratch, 1, n, file_);
237 } while (r == 0 && ferror(file_) && errno == EINTR);
238 *result = Slice(scratch, r);
239 if (r < n) {
240 if (feof(file_)) {
241 // We leave status as ok if we hit the end of the file
242 // We also clear the error so that the reads can continue
243 // if a new data is written to the file
244 clearerr(file_);
245 } else {
246 // A partial read with an error: return a non-ok status
247 s = IOError("While reading file sequentially", filename_, errno);
248 }
249 }
250 return s;
251 }
252
253 IOStatus PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
254 const IOOptions& /*opts*/,
255 Slice* result, char* scratch,
256 IODebugContext* /*dbg*/) {
257 assert(use_direct_io());
258 assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
259 assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
260 assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
261
262 IOStatus s;
263 ssize_t r = -1;
264 size_t left = n;
265 char* ptr = scratch;
266 while (left > 0) {
267 r = pread(fd_, ptr, left, static_cast<off_t>(offset));
268 if (r <= 0) {
269 if (r == -1 && errno == EINTR) {
270 continue;
271 }
272 break;
273 }
274 ptr += r;
275 offset += r;
276 left -= r;
277 if (!IsSectorAligned(r, GetRequiredBufferAlignment())) {
278 // Bytes reads don't fill sectors. Should only happen at the end
279 // of the file.
280 break;
281 }
282 }
283 if (r < 0) {
284 // An error: return a non-ok status
285 s = IOError("While pread " + std::to_string(n) + " bytes from offset " +
286 std::to_string(offset),
287 filename_, errno);
288 }
289 *result = Slice(scratch, (r < 0) ? 0 : n - left);
290 return s;
291 }
292
293 IOStatus PosixSequentialFile::Skip(uint64_t n) {
294 if (fseek(file_, static_cast<long int>(n), SEEK_CUR)) {
295 return IOError("While fseek to skip " + std::to_string(n) + " bytes",
296 filename_, errno);
297 }
298 return IOStatus::OK();
299 }
300
301 IOStatus PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
302 #ifndef OS_LINUX
303 (void)offset;
304 (void)length;
305 return IOStatus::OK();
306 #else
307 if (!use_direct_io()) {
308 // free OS pages
309 int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
310 if (ret != 0) {
311 return IOError("While fadvise NotNeeded offset " +
312 std::to_string(offset) + " len " +
313 std::to_string(length),
314 filename_, errno);
315 }
316 }
317 return IOStatus::OK();
318 #endif
319 }
320
321 /*
322 * PosixRandomAccessFile
323 */
324 #if defined(OS_LINUX)
325 size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
326 if (max_size < kMaxVarint64Length * 3) {
327 return 0;
328 }
329
330 struct stat buf;
331 int result = fstat(fd, &buf);
332 if (result == -1) {
333 return 0;
334 }
335
336 long version = 0;
337 result = ioctl(fd, FS_IOC_GETVERSION, &version);
338 TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
339 if (result == -1) {
340 return 0;
341 }
342 uint64_t uversion = (uint64_t)version;
343
344 char* rid = id;
345 rid = EncodeVarint64(rid, buf.st_dev);
346 rid = EncodeVarint64(rid, buf.st_ino);
347 rid = EncodeVarint64(rid, uversion);
348 assert(rid >= id);
349 return static_cast<size_t>(rid - id);
350 }
351 #endif
352
353 #if defined(OS_MACOSX) || defined(OS_AIX)
354 size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
355 if (max_size < kMaxVarint64Length * 3) {
356 return 0;
357 }
358
359 struct stat buf;
360 int result = fstat(fd, &buf);
361 if (result == -1) {
362 return 0;
363 }
364
365 char* rid = id;
366 rid = EncodeVarint64(rid, buf.st_dev);
367 rid = EncodeVarint64(rid, buf.st_ino);
368 rid = EncodeVarint64(rid, buf.st_gen);
369 assert(rid >= id);
370 return static_cast<size_t>(rid - id);
371 }
372 #endif
373
374 #ifdef OS_LINUX
375 std::string RemoveTrailingSlash(const std::string& path) {
376 std::string p = path;
377 if (p.size() > 1 && p.back() == '/') {
378 p.pop_back();
379 }
380 return p;
381 }
382
383 Status LogicalBlockSizeCache::RefAndCacheLogicalBlockSize(
384 const std::vector<std::string>& directories) {
385 std::vector<std::string> dirs;
386 dirs.reserve(directories.size());
387 for (auto& d : directories) {
388 dirs.emplace_back(RemoveTrailingSlash(d));
389 }
390
391 std::map<std::string, size_t> dir_sizes;
392 {
393 ReadLock lock(&cache_mutex_);
394 for (const auto& dir : dirs) {
395 if (cache_.find(dir) == cache_.end()) {
396 dir_sizes.emplace(dir, 0);
397 }
398 }
399 }
400
401 Status s;
402 for (auto& dir_size : dir_sizes) {
403 s = get_logical_block_size_of_directory_(dir_size.first, &dir_size.second);
404 if (!s.ok()) {
405 return s;
406 }
407 }
408
409 WriteLock lock(&cache_mutex_);
410 for (const auto& dir : dirs) {
411 auto& v = cache_[dir];
412 v.ref++;
413 auto dir_size = dir_sizes.find(dir);
414 if (dir_size != dir_sizes.end()) {
415 v.size = dir_size->second;
416 }
417 }
418 return s;
419 }
420
421 void LogicalBlockSizeCache::UnrefAndTryRemoveCachedLogicalBlockSize(
422 const std::vector<std::string>& directories) {
423 std::vector<std::string> dirs;
424 dirs.reserve(directories.size());
425 for (auto& dir : directories) {
426 dirs.emplace_back(RemoveTrailingSlash(dir));
427 }
428
429 WriteLock lock(&cache_mutex_);
430 for (const auto& dir : dirs) {
431 auto it = cache_.find(dir);
432 if (it != cache_.end() && !(--(it->second.ref))) {
433 cache_.erase(it);
434 }
435 }
436 }
437
438 size_t LogicalBlockSizeCache::GetLogicalBlockSize(const std::string& fname,
439 int fd) {
440 std::string dir = fname.substr(0, fname.find_last_of("/"));
441 if (dir.empty()) {
442 dir = "/";
443 }
444 {
445 ReadLock lock(&cache_mutex_);
446 auto it = cache_.find(dir);
447 if (it != cache_.end()) {
448 return it->second.size;
449 }
450 }
451 return get_logical_block_size_of_fd_(fd);
452 }
453 #endif
454
455 Status PosixHelper::GetLogicalBlockSizeOfDirectory(const std::string& directory,
456 size_t* size) {
457 int fd = open(directory.c_str(), O_DIRECTORY | O_RDONLY);
458 if (fd == -1) {
459 close(fd);
460 return Status::IOError("Cannot open directory " + directory);
461 }
462 *size = PosixHelper::GetLogicalBlockSizeOfFd(fd);
463 close(fd);
464 return Status::OK();
465 }
466
467 size_t PosixHelper::GetLogicalBlockSizeOfFd(int fd) {
468 #ifdef OS_LINUX
469 struct stat buf;
470 int result = fstat(fd, &buf);
471 if (result == -1) {
472 return kDefaultPageSize;
473 }
474 if (major(buf.st_dev) == 0) {
475 // Unnamed devices (e.g. non-device mounts), reserved as null device number.
476 // These don't have an entry in /sys/dev/block/. Return a sensible default.
477 return kDefaultPageSize;
478 }
479
480 // Reading queue/logical_block_size does not require special permissions.
481 const int kBufferSize = 100;
482 char path[kBufferSize];
483 char real_path[PATH_MAX + 1];
484 snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
485 minor(buf.st_dev));
486 if (realpath(path, real_path) == nullptr) {
487 return kDefaultPageSize;
488 }
489 std::string device_dir(real_path);
490 if (!device_dir.empty() && device_dir.back() == '/') {
491 device_dir.pop_back();
492 }
493 // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
494 // and nvme0n1 have it.
495 // $ ls -al '/sys/dev/block/8:3'
496 // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
497 // ../../block/sda/sda3
498 // $ ls -al '/sys/dev/block/259:4'
499 // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
500 // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
501 size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
502 if (parent_end == std::string::npos) {
503 return kDefaultPageSize;
504 }
505 size_t parent_begin = device_dir.rfind('/', parent_end - 1);
506 if (parent_begin == std::string::npos) {
507 return kDefaultPageSize;
508 }
509 std::string parent =
510 device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
511 std::string child = device_dir.substr(parent_end + 1, std::string::npos);
512 if (parent != "block" &&
513 (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
514 device_dir = device_dir.substr(0, parent_end);
515 }
516 std::string fname = device_dir + "/queue/logical_block_size";
517 FILE* fp;
518 size_t size = 0;
519 fp = fopen(fname.c_str(), "r");
520 if (fp != nullptr) {
521 char* line = nullptr;
522 size_t len = 0;
523 if (getline(&line, &len, fp) != -1) {
524 sscanf(line, "%zu", &size);
525 }
526 free(line);
527 fclose(fp);
528 }
529 if (size != 0 && (size & (size - 1)) == 0) {
530 return size;
531 }
532 #endif
533 (void)fd;
534 return kDefaultPageSize;
535 }
536
537 /*
538 * PosixRandomAccessFile
539 *
540 * pread() based random-access
541 */
542 PosixRandomAccessFile::PosixRandomAccessFile(
543 const std::string& fname, int fd, size_t logical_block_size,
544 const EnvOptions& options
545 #if defined(ROCKSDB_IOURING_PRESENT)
546 ,
547 ThreadLocalPtr* thread_local_io_urings
548 #endif
549 )
550 : filename_(fname),
551 fd_(fd),
552 use_direct_io_(options.use_direct_reads),
553 logical_sector_size_(logical_block_size)
554 #if defined(ROCKSDB_IOURING_PRESENT)
555 ,
556 thread_local_io_urings_(thread_local_io_urings)
557 #endif
558 {
559 assert(!options.use_direct_reads || !options.use_mmap_reads);
560 assert(!options.use_mmap_reads);
561 }
562
563 PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); }
564
565 IOStatus PosixRandomAccessFile::Read(uint64_t offset, size_t n,
566 const IOOptions& /*opts*/, Slice* result,
567 char* scratch,
568 IODebugContext* /*dbg*/) const {
569 if (use_direct_io()) {
570 assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
571 assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
572 assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
573 }
574 IOStatus s;
575 ssize_t r = -1;
576 size_t left = n;
577 char* ptr = scratch;
578 while (left > 0) {
579 r = pread(fd_, ptr, left, static_cast<off_t>(offset));
580 if (r <= 0) {
581 if (r == -1 && errno == EINTR) {
582 continue;
583 }
584 break;
585 }
586 ptr += r;
587 offset += r;
588 left -= r;
589 if (use_direct_io() &&
590 r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
591 // Bytes reads don't fill sectors. Should only happen at the end
592 // of the file.
593 break;
594 }
595 }
596 if (r < 0) {
597 // An error: return a non-ok status
598 s = IOError("While pread offset " + std::to_string(offset) + " len " +
599 std::to_string(n),
600 filename_, errno);
601 }
602 *result = Slice(scratch, (r < 0) ? 0 : n - left);
603 return s;
604 }
605
606 IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
607 const IOOptions& options,
608 IODebugContext* dbg) {
609 if (use_direct_io()) {
610 for (size_t i = 0; i < num_reqs; i++) {
611 assert(IsSectorAligned(reqs[i].offset, GetRequiredBufferAlignment()));
612 assert(IsSectorAligned(reqs[i].len, GetRequiredBufferAlignment()));
613 assert(IsSectorAligned(reqs[i].scratch, GetRequiredBufferAlignment()));
614 }
615 }
616
617 #if defined(ROCKSDB_IOURING_PRESENT)
618 struct io_uring* iu = nullptr;
619 if (thread_local_io_urings_) {
620 iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
621 if (iu == nullptr) {
622 iu = CreateIOUring();
623 if (iu != nullptr) {
624 thread_local_io_urings_->Reset(iu);
625 }
626 }
627 }
628
629 // Init failed, platform doesn't support io_uring. Fall back to
630 // serialized reads
631 if (iu == nullptr) {
632 return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
633 }
634
635 IOStatus ios = IOStatus::OK();
636
637 struct WrappedReadRequest {
638 FSReadRequest* req;
639 struct iovec iov;
640 size_t finished_len;
641 explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {}
642 };
643
644 autovector<WrappedReadRequest, 32> req_wraps;
645 autovector<WrappedReadRequest*, 4> incomplete_rq_list;
646 std::unordered_set<WrappedReadRequest*> wrap_cache;
647
648 for (size_t i = 0; i < num_reqs; i++) {
649 req_wraps.emplace_back(&reqs[i]);
650 }
651
652 size_t reqs_off = 0;
653 while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
654 size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();
655
656 // If requests exceed depth, split it into batches
657 if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth;
658
659 assert(incomplete_rq_list.size() <= this_reqs);
660 for (size_t i = 0; i < this_reqs; i++) {
661 WrappedReadRequest* rep_to_submit;
662 if (i < incomplete_rq_list.size()) {
663 rep_to_submit = incomplete_rq_list[i];
664 } else {
665 rep_to_submit = &req_wraps[reqs_off++];
666 }
667 assert(rep_to_submit->req->len > rep_to_submit->finished_len);
668 rep_to_submit->iov.iov_base =
669 rep_to_submit->req->scratch + rep_to_submit->finished_len;
670 rep_to_submit->iov.iov_len =
671 rep_to_submit->req->len - rep_to_submit->finished_len;
672
673 struct io_uring_sqe* sqe;
674 sqe = io_uring_get_sqe(iu);
675 io_uring_prep_readv(
676 sqe, fd_, &rep_to_submit->iov, 1,
677 rep_to_submit->req->offset + rep_to_submit->finished_len);
678 io_uring_sqe_set_data(sqe, rep_to_submit);
679 wrap_cache.emplace(rep_to_submit);
680 }
681 incomplete_rq_list.clear();
682
683 ssize_t ret =
684 io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
685 TEST_SYNC_POINT_CALLBACK(
686 "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
687 &ret);
688 TEST_SYNC_POINT_CALLBACK(
689 "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
690 iu);
691
692 if (static_cast<size_t>(ret) != this_reqs) {
693 fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
694 // If error happens and we submitted fewer than expected, it is an
695 // exception case and we don't retry here. We should still consume
696 // what is is submitted in the ring.
697 for (ssize_t i = 0; i < ret; i++) {
698 struct io_uring_cqe* cqe = nullptr;
699 io_uring_wait_cqe(iu, &cqe);
700 if (cqe != nullptr) {
701 io_uring_cqe_seen(iu, cqe);
702 }
703 }
704 return IOStatus::IOError("io_uring_submit_and_wait() requested " +
705 std::to_string(this_reqs) + " but returned " +
706 std::to_string(ret));
707 }
708
709 for (size_t i = 0; i < this_reqs; i++) {
710 struct io_uring_cqe* cqe = nullptr;
711 WrappedReadRequest* req_wrap;
712
713 // We could use the peek variant here, but this seems safer in terms
714 // of our initial wait not reaping all completions
715 ret = io_uring_wait_cqe(iu, &cqe);
716 TEST_SYNC_POINT_CALLBACK(
717 "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret);
718 if (ret) {
719 ios = IOStatus::IOError("io_uring_wait_cqe() returns " +
720 std::to_string(ret));
721
722 if (cqe != nullptr) {
723 io_uring_cqe_seen(iu, cqe);
724 }
725 continue;
726 }
727
728 req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
729 // Reset cqe data to catch any stray reuse of it
730 static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
731 // Check that we got a valid unique cqe data
732 auto wrap_check = wrap_cache.find(req_wrap);
733 if (wrap_check == wrap_cache.end()) {
734 fprintf(stderr,
735 "PosixRandomAccessFile::MultiRead: "
736 "Bad cqe data from IO uring - %p\n",
737 req_wrap);
738 port::PrintStack();
739 ios = IOStatus::IOError("io_uring_cqe_get_data() returned " +
740 std::to_string((uint64_t)req_wrap));
741 continue;
742 }
743 wrap_cache.erase(wrap_check);
744
745 FSReadRequest* req = req_wrap->req;
746 size_t bytes_read = 0;
747 bool read_again = false;
748 UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
749 false /*async_read*/, use_direct_io(),
750 GetRequiredBufferAlignment(), req_wrap->finished_len, req,
751 bytes_read, read_again);
752 int32_t res = cqe->res;
753 if (res >= 0) {
754 if (bytes_read == 0) {
755 if (read_again) {
756 Slice tmp_slice;
757 req->status =
758 Read(req->offset + req_wrap->finished_len,
759 req->len - req_wrap->finished_len, options, &tmp_slice,
760 req->scratch + req_wrap->finished_len, dbg);
761 req->result =
762 Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
763 }
764 // else It means EOF so no need to do anything.
765 } else if (bytes_read < req_wrap->iov.iov_len) {
766 incomplete_rq_list.push_back(req_wrap);
767 }
768 }
769 io_uring_cqe_seen(iu, cqe);
770 }
771 wrap_cache.clear();
772 }
773 return ios;
774 #else
775 return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
776 #endif
777 }
778
779 IOStatus PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n,
780 const IOOptions& /*opts*/,
781 IODebugContext* /*dbg*/) {
782 IOStatus s;
783 if (!use_direct_io()) {
784 ssize_t r = 0;
785 #ifdef OS_LINUX
786 r = readahead(fd_, offset, n);
787 #endif
788 #ifdef OS_MACOSX
789 radvisory advice;
790 advice.ra_offset = static_cast<off_t>(offset);
791 advice.ra_count = static_cast<int>(n);
792 r = fcntl(fd_, F_RDADVISE, &advice);
793 #endif
794 if (r == -1) {
795 s = IOError("While prefetching offset " + std::to_string(offset) +
796 " len " + std::to_string(n),
797 filename_, errno);
798 }
799 }
800 return s;
801 }
802
803 #if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
804 size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
805 return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
806 }
807 #endif
808
809 void PosixRandomAccessFile::Hint(AccessPattern pattern) {
810 if (use_direct_io()) {
811 return;
812 }
813 switch (pattern) {
814 case kNormal:
815 Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL);
816 break;
817 case kRandom:
818 Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
819 break;
820 case kSequential:
821 Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
822 break;
823 case kWillNeed:
824 Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED);
825 break;
826 case kWontNeed:
827 Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED);
828 break;
829 default:
830 assert(false);
831 break;
832 }
833 }
834
835 IOStatus PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
836 if (use_direct_io()) {
837 return IOStatus::OK();
838 }
839 #ifndef OS_LINUX
840 (void)offset;
841 (void)length;
842 return IOStatus::OK();
843 #else
844 // free OS pages
845 int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
846 if (ret == 0) {
847 return IOStatus::OK();
848 }
849 return IOError("While fadvise NotNeeded offset " + std::to_string(offset) +
850 " len " + std::to_string(length),
851 filename_, errno);
852 #endif
853 }
854
855 IOStatus PosixRandomAccessFile::ReadAsync(
856 FSReadRequest& req, const IOOptions& /*opts*/,
857 std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
858 void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
859 if (use_direct_io()) {
860 assert(IsSectorAligned(req.offset, GetRequiredBufferAlignment()));
861 assert(IsSectorAligned(req.len, GetRequiredBufferAlignment()));
862 assert(IsSectorAligned(req.scratch, GetRequiredBufferAlignment()));
863 }
864
865 #if defined(ROCKSDB_IOURING_PRESENT)
866 // io_uring_queue_init.
867 struct io_uring* iu = nullptr;
868 if (thread_local_io_urings_) {
869 iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
870 if (iu == nullptr) {
871 iu = CreateIOUring();
872 if (iu != nullptr) {
873 thread_local_io_urings_->Reset(iu);
874 }
875 }
876 }
877
878 // Init failed, platform doesn't support io_uring.
879 if (iu == nullptr) {
880 return IOStatus::NotSupported("ReadAsync");
881 }
882
883 // Allocate io_handle.
884 IOHandleDeleter deletefn = [](void* args) -> void {
885 delete (static_cast<Posix_IOHandle*>(args));
886 args = nullptr;
887 };
888
889 // Initialize Posix_IOHandle.
890 Posix_IOHandle* posix_handle =
891 new Posix_IOHandle(iu, cb, cb_arg, req.offset, req.len, req.scratch,
892 use_direct_io(), GetRequiredBufferAlignment());
893 posix_handle->iov.iov_base = req.scratch;
894 posix_handle->iov.iov_len = req.len;
895
896 *io_handle = static_cast<void*>(posix_handle);
897 *del_fn = deletefn;
898
899 // Step 3: io_uring_sqe_set_data
900 struct io_uring_sqe* sqe;
901 sqe = io_uring_get_sqe(iu);
902
903 io_uring_prep_readv(sqe, fd_, /*sqe->addr=*/&posix_handle->iov,
904 /*sqe->len=*/1, /*sqe->offset=*/posix_handle->offset);
905
906 // Sets sqe->user_data to posix_handle.
907 io_uring_sqe_set_data(sqe, posix_handle);
908
909 // Step 4: io_uring_submit
910 ssize_t ret = io_uring_submit(iu);
911 if (ret < 0) {
912 fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
913 return IOStatus::IOError("io_uring_submit() requested but returned " +
914 std::to_string(ret));
915 }
916 return IOStatus::OK();
917 #else
918 (void)req;
919 (void)cb;
920 (void)cb_arg;
921 (void)io_handle;
922 (void)del_fn;
923 return IOStatus::NotSupported("ReadAsync");
924 #endif
925 }
926
927 /*
928 * PosixMmapReadableFile
929 *
930 * mmap() based random-access
931 */
932 // base[0,length-1] contains the mmapped contents of the file.
933 PosixMmapReadableFile::PosixMmapReadableFile(const int fd,
934 const std::string& fname,
935 void* base, size_t length,
936 const EnvOptions& options)
937 : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) {
938 #ifdef NDEBUG
939 (void)options;
940 #endif
941 fd_ = fd_ + 0; // suppress the warning for used variables
942 assert(options.use_mmap_reads);
943 assert(!options.use_direct_reads);
944 }
945
946 PosixMmapReadableFile::~PosixMmapReadableFile() {
947 int ret = munmap(mmapped_region_, length_);
948 if (ret != 0) {
949 fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n",
950 mmapped_region_, length_);
951 }
952 close(fd_);
953 }
954
955 IOStatus PosixMmapReadableFile::Read(uint64_t offset, size_t n,
956 const IOOptions& /*opts*/, Slice* result,
957 char* /*scratch*/,
958 IODebugContext* /*dbg*/) const {
959 IOStatus s;
960 if (offset > length_) {
961 *result = Slice();
962 return IOError("While mmap read offset " + std::to_string(offset) +
963 " larger than file length " + std::to_string(length_),
964 filename_, EINVAL);
965 } else if (offset + n > length_) {
966 n = static_cast<size_t>(length_ - offset);
967 }
968 *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
969 return s;
970 }
971
972 void PosixMmapReadableFile::Hint(AccessPattern pattern) {
973 switch (pattern) {
974 case kNormal:
975 Madvise(mmapped_region_, length_, POSIX_MADV_NORMAL);
976 break;
977 case kRandom:
978 Madvise(mmapped_region_, length_, POSIX_MADV_RANDOM);
979 break;
980 case kSequential:
981 Madvise(mmapped_region_, length_, POSIX_MADV_SEQUENTIAL);
982 break;
983 case kWillNeed:
984 Madvise(mmapped_region_, length_, POSIX_MADV_WILLNEED);
985 break;
986 case kWontNeed:
987 Madvise(mmapped_region_, length_, POSIX_MADV_DONTNEED);
988 break;
989 default:
990 assert(false);
991 break;
992 }
993 }
994
995 IOStatus PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) {
996 #ifndef OS_LINUX
997 (void)offset;
998 (void)length;
999 return IOStatus::OK();
1000 #else
1001 // free OS pages
1002 int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
1003 if (ret == 0) {
1004 return IOStatus::OK();
1005 }
1006 return IOError("While fadvise not needed. Offset " + std::to_string(offset) +
1007 " len" + std::to_string(length),
1008 filename_, errno);
1009 #endif
1010 }
1011
1012 /*
1013 * PosixMmapFile
1014 *
1015 * We preallocate up to an extra megabyte and use memcpy to append new
1016 * data to the file. This is safe since we either properly close the
1017 * file before reading from it, or for log files, the reading code
1018 * knows enough to skip zero suffixes.
1019 */
1020 IOStatus PosixMmapFile::UnmapCurrentRegion() {
1021 TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0");
1022 if (base_ != nullptr) {
1023 int munmap_status = munmap(base_, limit_ - base_);
1024 if (munmap_status != 0) {
1025 return IOError("While munmap", filename_, munmap_status);
1026 }
1027 file_offset_ += limit_ - base_;
1028 base_ = nullptr;
1029 limit_ = nullptr;
1030 last_sync_ = nullptr;
1031 dst_ = nullptr;
1032
1033 // Increase the amount we map the next time, but capped at 1MB
1034 if (map_size_ < (1 << 20)) {
1035 map_size_ *= 2;
1036 }
1037 }
1038 return IOStatus::OK();
1039 }
1040
1041 IOStatus PosixMmapFile::MapNewRegion() {
1042 #ifdef ROCKSDB_FALLOCATE_PRESENT
1043 assert(base_ == nullptr);
1044 TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0");
1045 // we can't fallocate with FALLOC_FL_KEEP_SIZE here
1046 if (allow_fallocate_) {
1047 IOSTATS_TIMER_GUARD(allocate_nanos);
1048 int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
1049 if (alloc_status != 0) {
1050 // fallback to posix_fallocate
1051 alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
1052 }
1053 if (alloc_status != 0) {
1054 return IOStatus::IOError("Error allocating space to file : " + filename_ +
1055 "Error : " + errnoStr(alloc_status).c_str());
1056 }
1057 }
1058
1059 TEST_KILL_RANDOM("PosixMmapFile::Append:1");
1060 void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_,
1061 file_offset_);
1062 if (ptr == MAP_FAILED) {
1063 return IOStatus::IOError("MMap failed on " + filename_);
1064 }
1065 TEST_KILL_RANDOM("PosixMmapFile::Append:2");
1066
1067 base_ = reinterpret_cast<char*>(ptr);
1068 limit_ = base_ + map_size_;
1069 dst_ = base_;
1070 last_sync_ = base_;
1071 return IOStatus::OK();
1072 #else
1073 return IOStatus::NotSupported("This platform doesn't support fallocate()");
1074 #endif
1075 }
1076
1077 IOStatus PosixMmapFile::Msync() {
1078 if (dst_ == last_sync_) {
1079 return IOStatus::OK();
1080 }
1081 // Find the beginnings of the pages that contain the first and last
1082 // bytes to be synced.
1083 size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
1084 size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
1085 last_sync_ = dst_;
1086 TEST_KILL_RANDOM("PosixMmapFile::Msync:0");
1087 if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
1088 return IOError("While msync", filename_, errno);
1089 }
1090 return IOStatus::OK();
1091 }
1092
1093 PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size,
1094 const EnvOptions& options)
1095 : filename_(fname),
1096 fd_(fd),
1097 page_size_(page_size),
1098 map_size_(Roundup(65536, page_size)),
1099 base_(nullptr),
1100 limit_(nullptr),
1101 dst_(nullptr),
1102 last_sync_(nullptr),
1103 file_offset_(0) {
1104 #ifdef ROCKSDB_FALLOCATE_PRESENT
1105 allow_fallocate_ = options.allow_fallocate;
1106 fallocate_with_keep_size_ = options.fallocate_with_keep_size;
1107 #else
1108 (void)options;
1109 #endif
1110 assert((page_size & (page_size - 1)) == 0);
1111 assert(options.use_mmap_writes);
1112 assert(!options.use_direct_writes);
1113 }
1114
1115 PosixMmapFile::~PosixMmapFile() {
1116 if (fd_ >= 0) {
1117 IOStatus s = PosixMmapFile::Close(IOOptions(), nullptr);
1118 s.PermitUncheckedError();
1119 }
1120 }
1121
1122 IOStatus PosixMmapFile::Append(const Slice& data, const IOOptions& /*opts*/,
1123 IODebugContext* /*dbg*/) {
1124 const char* src = data.data();
1125 size_t left = data.size();
1126 while (left > 0) {
1127 assert(base_ <= dst_);
1128 assert(dst_ <= limit_);
1129 size_t avail = limit_ - dst_;
1130 if (avail == 0) {
1131 IOStatus s = UnmapCurrentRegion();
1132 if (!s.ok()) {
1133 return s;
1134 }
1135 s = MapNewRegion();
1136 if (!s.ok()) {
1137 return s;
1138 }
1139 TEST_KILL_RANDOM("PosixMmapFile::Append:0");
1140 }
1141
1142 size_t n = (left <= avail) ? left : avail;
1143 assert(dst_);
1144 memcpy(dst_, src, n);
1145 dst_ += n;
1146 src += n;
1147 left -= n;
1148 }
1149 return IOStatus::OK();
1150 }
1151
1152 IOStatus PosixMmapFile::Close(const IOOptions& /*opts*/,
1153 IODebugContext* /*dbg*/) {
1154 IOStatus s;
1155 size_t unused = limit_ - dst_;
1156
1157 s = UnmapCurrentRegion();
1158 if (!s.ok()) {
1159 s = IOError("While closing mmapped file", filename_, errno);
1160 } else if (unused > 0) {
1161 // Trim the extra space at the end of the file
1162 if (ftruncate(fd_, file_offset_ - unused) < 0) {
1163 s = IOError("While ftruncating mmaped file", filename_, errno);
1164 }
1165 }
1166
1167 if (close(fd_) < 0) {
1168 if (s.ok()) {
1169 s = IOError("While closing mmapped file", filename_, errno);
1170 }
1171 }
1172
1173 fd_ = -1;
1174 base_ = nullptr;
1175 limit_ = nullptr;
1176 return s;
1177 }
1178
1179 IOStatus PosixMmapFile::Flush(const IOOptions& /*opts*/,
1180 IODebugContext* /*dbg*/) {
1181 return IOStatus::OK();
1182 }
1183
1184 IOStatus PosixMmapFile::Sync(const IOOptions& /*opts*/,
1185 IODebugContext* /*dbg*/) {
1186 #ifdef HAVE_FULLFSYNC
1187 if (::fcntl(fd_, F_FULLFSYNC) < 0) {
1188 return IOError("while fcntl(F_FULLSYNC) mmapped file", filename_, errno);
1189 }
1190 #else // HAVE_FULLFSYNC
1191 if (fdatasync(fd_) < 0) {
1192 return IOError("While fdatasync mmapped file", filename_, errno);
1193 }
1194 #endif // HAVE_FULLFSYNC
1195
1196 return Msync();
1197 }
1198
1199 /**
1200 * Flush data as well as metadata to stable storage.
1201 */
1202 IOStatus PosixMmapFile::Fsync(const IOOptions& /*opts*/,
1203 IODebugContext* /*dbg*/) {
1204 #ifdef HAVE_FULLFSYNC
1205 if (::fcntl(fd_, F_FULLFSYNC) < 0) {
1206 return IOError("While fcntl(F_FULLSYNC) on mmaped file", filename_, errno);
1207 }
1208 #else // HAVE_FULLFSYNC
1209 if (fsync(fd_) < 0) {
1210 return IOError("While fsync mmaped file", filename_, errno);
1211 }
1212 #endif // HAVE_FULLFSYNC
1213
1214 return Msync();
1215 }
1216
1217 /**
1218 * Get the size of valid data in the file. This will not match the
1219 * size that is returned from the filesystem because we use mmap
1220 * to extend file by map_size every time.
1221 */
1222 uint64_t PosixMmapFile::GetFileSize(const IOOptions& /*opts*/,
1223 IODebugContext* /*dbg*/) {
1224 size_t used = dst_ - base_;
1225 return file_offset_ + used;
1226 }
1227
1228 IOStatus PosixMmapFile::InvalidateCache(size_t offset, size_t length) {
1229 #ifndef OS_LINUX
1230 (void)offset;
1231 (void)length;
1232 return IOStatus::OK();
1233 #else
1234 // free OS pages
1235 int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
1236 if (ret == 0) {
1237 return IOStatus::OK();
1238 }
1239 return IOError("While fadvise NotNeeded mmapped file", filename_, errno);
1240 #endif
1241 }
1242
1243 #ifdef ROCKSDB_FALLOCATE_PRESENT
1244 IOStatus PosixMmapFile::Allocate(uint64_t offset, uint64_t len,
1245 const IOOptions& /*opts*/,
1246 IODebugContext* /*dbg*/) {
1247 assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1248 assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1249 TEST_KILL_RANDOM("PosixMmapFile::Allocate:0");
1250 int alloc_status = 0;
1251 if (allow_fallocate_) {
1252 alloc_status =
1253 fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
1254 static_cast<off_t>(offset), static_cast<off_t>(len));
1255 }
1256 if (alloc_status == 0) {
1257 return IOStatus::OK();
1258 } else {
1259 return IOError("While fallocate offset " + std::to_string(offset) +
1260 " len " + std::to_string(len),
1261 filename_, errno);
1262 }
1263 }
1264 #endif
1265
1266 /*
1267 * PosixWritableFile
1268 *
1269 * Use posix write to write data to a file.
1270 */
1271 PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
1272 size_t logical_block_size,
1273 const EnvOptions& options)
1274 : FSWritableFile(options),
1275 filename_(fname),
1276 use_direct_io_(options.use_direct_writes),
1277 fd_(fd),
1278 filesize_(0),
1279 logical_sector_size_(logical_block_size) {
1280 #ifdef ROCKSDB_FALLOCATE_PRESENT
1281 allow_fallocate_ = options.allow_fallocate;
1282 fallocate_with_keep_size_ = options.fallocate_with_keep_size;
1283 #endif
1284 #ifdef ROCKSDB_RANGESYNC_PRESENT
1285 sync_file_range_supported_ = IsSyncFileRangeSupported(fd_);
1286 #endif // ROCKSDB_RANGESYNC_PRESENT
1287 assert(!options.use_mmap_writes);
1288 }
1289
1290 PosixWritableFile::~PosixWritableFile() {
1291 if (fd_ >= 0) {
1292 IOStatus s = PosixWritableFile::Close(IOOptions(), nullptr);
1293 s.PermitUncheckedError();
1294 }
1295 }
1296
1297 IOStatus PosixWritableFile::Append(const Slice& data, const IOOptions& /*opts*/,
1298 IODebugContext* /*dbg*/) {
1299 if (use_direct_io()) {
1300 assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
1301 assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
1302 }
1303 const char* src = data.data();
1304 size_t nbytes = data.size();
1305
1306 if (!PosixWrite(fd_, src, nbytes)) {
1307 return IOError("While appending to file", filename_, errno);
1308 }
1309
1310 filesize_ += nbytes;
1311 return IOStatus::OK();
1312 }
1313
1314 IOStatus PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset,
1315 const IOOptions& /*opts*/,
1316 IODebugContext* /*dbg*/) {
1317 if (use_direct_io()) {
1318 assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
1319 assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
1320 assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
1321 }
1322 assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1323 const char* src = data.data();
1324 size_t nbytes = data.size();
1325 if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
1326 return IOError("While pwrite to file at offset " + std::to_string(offset),
1327 filename_, errno);
1328 }
1329 filesize_ = offset + nbytes;
1330 return IOStatus::OK();
1331 }
1332
1333 IOStatus PosixWritableFile::Truncate(uint64_t size, const IOOptions& /*opts*/,
1334 IODebugContext* /*dbg*/) {
1335 IOStatus s;
1336 int r = ftruncate(fd_, size);
1337 if (r < 0) {
1338 s = IOError("While ftruncate file to size " + std::to_string(size),
1339 filename_, errno);
1340 } else {
1341 filesize_ = size;
1342 }
1343 return s;
1344 }
1345
1346 IOStatus PosixWritableFile::Close(const IOOptions& /*opts*/,
1347 IODebugContext* /*dbg*/) {
1348 IOStatus s;
1349
1350 size_t block_size;
1351 size_t last_allocated_block;
1352 GetPreallocationStatus(&block_size, &last_allocated_block);
1353 TEST_SYNC_POINT_CALLBACK("PosixWritableFile::Close", &last_allocated_block);
1354 if (last_allocated_block > 0) {
1355 // trim the extra space preallocated at the end of the file
1356 // NOTE(ljin): we probably don't want to surface failure as an IOError,
1357 // but it will be nice to log these errors.
1358 int dummy __attribute__((__unused__));
1359 dummy = ftruncate(fd_, filesize_);
1360 #if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE)
1361 // in some file systems, ftruncate only trims trailing space if the
1362 // new file size is smaller than the current size. Calling fallocate
1363 // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused
1364 // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following
1365 // filesystems:
1366 // XFS (since Linux 2.6.38)
1367 // ext4 (since Linux 3.0)
1368 // Btrfs (since Linux 3.7)
1369 // tmpfs (since Linux 3.5)
1370 // We ignore error since failure of this operation does not affect
1371 // correctness.
1372 struct stat file_stats;
1373 int result = fstat(fd_, &file_stats);
1374 // After ftruncate, we check whether ftruncate has the correct behavior.
1375 // If not, we should hack it with FALLOC_FL_PUNCH_HOLE
1376 if (result == 0 &&
1377 (file_stats.st_size + file_stats.st_blksize - 1) /
1378 file_stats.st_blksize !=
1379 file_stats.st_blocks / (file_stats.st_blksize / 512)) {
1380 IOSTATS_TIMER_GUARD(allocate_nanos);
1381 if (allow_fallocate_) {
1382 fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_,
1383 block_size * last_allocated_block - filesize_);
1384 }
1385 }
1386 #endif
1387 }
1388
1389 if (close(fd_) < 0) {
1390 s = IOError("While closing file after writing", filename_, errno);
1391 }
1392 fd_ = -1;
1393 return s;
1394 }
1395
1396 // write out the cached data to the OS cache
1397 IOStatus PosixWritableFile::Flush(const IOOptions& /*opts*/,
1398 IODebugContext* /*dbg*/) {
1399 return IOStatus::OK();
1400 }
1401
1402 IOStatus PosixWritableFile::Sync(const IOOptions& /*opts*/,
1403 IODebugContext* /*dbg*/) {
1404 #ifdef HAVE_FULLFSYNC
1405 if (::fcntl(fd_, F_FULLFSYNC) < 0) {
1406 return IOError("while fcntl(F_FULLFSYNC)", filename_, errno);
1407 }
1408 #else // HAVE_FULLFSYNC
1409 if (fdatasync(fd_) < 0) {
1410 return IOError("While fdatasync", filename_, errno);
1411 }
1412 #endif // HAVE_FULLFSYNC
1413 return IOStatus::OK();
1414 }
1415
1416 IOStatus PosixWritableFile::Fsync(const IOOptions& /*opts*/,
1417 IODebugContext* /*dbg*/) {
1418 #ifdef HAVE_FULLFSYNC
1419 if (::fcntl(fd_, F_FULLFSYNC) < 0) {
1420 return IOError("while fcntl(F_FULLFSYNC)", filename_, errno);
1421 }
1422 #else // HAVE_FULLFSYNC
1423 if (fsync(fd_) < 0) {
1424 return IOError("While fsync", filename_, errno);
1425 }
1426 #endif // HAVE_FULLFSYNC
1427 return IOStatus::OK();
1428 }
1429
1430 bool PosixWritableFile::IsSyncThreadSafe() const { return true; }
1431
1432 uint64_t PosixWritableFile::GetFileSize(const IOOptions& /*opts*/,
1433 IODebugContext* /*dbg*/) {
1434 return filesize_;
1435 }
1436
1437 void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
1438 #ifdef OS_LINUX
1439 // Suppress Valgrind "Unimplemented functionality" error.
1440 #ifndef ROCKSDB_VALGRIND_RUN
1441 if (hint == write_hint_) {
1442 return;
1443 }
1444 if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
1445 write_hint_ = hint;
1446 }
1447 #else
1448 (void)hint;
1449 #endif // ROCKSDB_VALGRIND_RUN
1450 #else
1451 (void)hint;
1452 #endif // OS_LINUX
1453 }
1454
1455 IOStatus PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
1456 if (use_direct_io()) {
1457 return IOStatus::OK();
1458 }
1459 #ifndef OS_LINUX
1460 (void)offset;
1461 (void)length;
1462 return IOStatus::OK();
1463 #else
1464 // free OS pages
1465 int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
1466 if (ret == 0) {
1467 return IOStatus::OK();
1468 }
1469 return IOError("While fadvise NotNeeded", filename_, errno);
1470 #endif
1471 }
1472
1473 #ifdef ROCKSDB_FALLOCATE_PRESENT
1474 IOStatus PosixWritableFile::Allocate(uint64_t offset, uint64_t len,
1475 const IOOptions& /*opts*/,
1476 IODebugContext* /*dbg*/) {
1477 assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1478 assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1479 TEST_KILL_RANDOM("PosixWritableFile::Allocate:0");
1480 IOSTATS_TIMER_GUARD(allocate_nanos);
1481 int alloc_status = 0;
1482 if (allow_fallocate_) {
1483 alloc_status =
1484 fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
1485 static_cast<off_t>(offset), static_cast<off_t>(len));
1486 }
1487 if (alloc_status == 0) {
1488 return IOStatus::OK();
1489 } else {
1490 return IOError("While fallocate offset " + std::to_string(offset) +
1491 " len " + std::to_string(len),
1492 filename_, errno);
1493 }
1494 }
1495 #endif
1496
1497 IOStatus PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes,
1498 const IOOptions& opts,
1499 IODebugContext* dbg) {
1500 #ifdef ROCKSDB_RANGESYNC_PRESENT
1501 assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1502 assert(nbytes <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1503 if (sync_file_range_supported_) {
1504 int ret;
1505 if (strict_bytes_per_sync_) {
1506 // Specifying `SYNC_FILE_RANGE_WAIT_BEFORE` together with an offset/length
1507 // that spans all bytes written so far tells `sync_file_range` to wait for
1508 // any outstanding writeback requests to finish before issuing a new one.
1509 ret =
1510 sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes),
1511 SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE);
1512 } else {
1513 ret = sync_file_range(fd_, static_cast<off_t>(offset),
1514 static_cast<off_t>(nbytes), SYNC_FILE_RANGE_WRITE);
1515 }
1516 if (ret != 0) {
1517 return IOError("While sync_file_range returned " + std::to_string(ret),
1518 filename_, errno);
1519 }
1520 return IOStatus::OK();
1521 }
1522 #endif // ROCKSDB_RANGESYNC_PRESENT
1523 return FSWritableFile::RangeSync(offset, nbytes, opts, dbg);
1524 }
1525
1526 #ifdef OS_LINUX
1527 size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const {
1528 return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
1529 }
1530 #endif
1531
1532 /*
1533 * PosixRandomRWFile
1534 */
1535
1536 PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd,
1537 const EnvOptions& /*options*/)
1538 : filename_(fname), fd_(fd) {}
1539
1540 PosixRandomRWFile::~PosixRandomRWFile() {
1541 if (fd_ >= 0) {
1542 IOStatus s = Close(IOOptions(), nullptr);
1543 s.PermitUncheckedError();
1544 }
1545 }
1546
1547 IOStatus PosixRandomRWFile::Write(uint64_t offset, const Slice& data,
1548 const IOOptions& /*opts*/,
1549 IODebugContext* /*dbg*/) {
1550 const char* src = data.data();
1551 size_t nbytes = data.size();
1552 if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
1553 return IOError("While write random read/write file at offset " +
1554 std::to_string(offset),
1555 filename_, errno);
1556 }
1557
1558 return IOStatus::OK();
1559 }
1560
1561 IOStatus PosixRandomRWFile::Read(uint64_t offset, size_t n,
1562 const IOOptions& /*opts*/, Slice* result,
1563 char* scratch, IODebugContext* /*dbg*/) const {
1564 size_t left = n;
1565 char* ptr = scratch;
1566 while (left > 0) {
1567 ssize_t done = pread(fd_, ptr, left, offset);
1568 if (done < 0) {
1569 // error while reading from file
1570 if (errno == EINTR) {
1571 // read was interrupted, try again.
1572 continue;
1573 }
1574 return IOError("While reading random read/write file offset " +
1575 std::to_string(offset) + " len " + std::to_string(n),
1576 filename_, errno);
1577 } else if (done == 0) {
1578 // Nothing more to read
1579 break;
1580 }
1581
1582 // Read `done` bytes
1583 ptr += done;
1584 offset += done;
1585 left -= done;
1586 }
1587
1588 *result = Slice(scratch, n - left);
1589 return IOStatus::OK();
1590 }
1591
1592 IOStatus PosixRandomRWFile::Flush(const IOOptions& /*opts*/,
1593 IODebugContext* /*dbg*/) {
1594 return IOStatus::OK();
1595 }
1596
1597 IOStatus PosixRandomRWFile::Sync(const IOOptions& /*opts*/,
1598 IODebugContext* /*dbg*/) {
1599 #ifdef HAVE_FULLFSYNC
1600 if (::fcntl(fd_, F_FULLFSYNC) < 0) {
1601 return IOError("while fcntl(F_FULLFSYNC) random rw file", filename_, errno);
1602 }
1603 #else // HAVE_FULLFSYNC
1604 if (fdatasync(fd_) < 0) {
1605 return IOError("While fdatasync random read/write file", filename_, errno);
1606 }
1607 #endif // HAVE_FULLFSYNC
1608 return IOStatus::OK();
1609 }
1610
1611 IOStatus PosixRandomRWFile::Fsync(const IOOptions& /*opts*/,
1612 IODebugContext* /*dbg*/) {
1613 #ifdef HAVE_FULLFSYNC
1614 if (::fcntl(fd_, F_FULLFSYNC) < 0) {
1615 return IOError("While fcntl(F_FULLSYNC) random rw file", filename_, errno);
1616 }
1617 #else // HAVE_FULLFSYNC
1618 if (fsync(fd_) < 0) {
1619 return IOError("While fsync random read/write file", filename_, errno);
1620 }
1621 #endif // HAVE_FULLFSYNC
1622 return IOStatus::OK();
1623 }
1624
1625 IOStatus PosixRandomRWFile::Close(const IOOptions& /*opts*/,
1626 IODebugContext* /*dbg*/) {
1627 if (close(fd_) < 0) {
1628 return IOError("While close random read/write file", filename_, errno);
1629 }
1630 fd_ = -1;
1631 return IOStatus::OK();
1632 }
1633
1634 PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
1635 // TODO should have error handling though not much we can do...
1636 munmap(this->base_, length_);
1637 }
1638
1639 /*
1640 * PosixDirectory
1641 */
1642 #if !defined(BTRFS_SUPER_MAGIC)
1643 // The magic number for BTRFS is fixed, if it's not defined, define it here
1644 #define BTRFS_SUPER_MAGIC 0x9123683E
1645 #endif
1646 PosixDirectory::PosixDirectory(int fd, const std::string& directory_name)
1647 : fd_(fd), directory_name_(directory_name) {
1648 is_btrfs_ = false;
1649 #ifdef OS_LINUX
1650 struct statfs buf;
1651 int ret = fstatfs(fd, &buf);
1652 is_btrfs_ = (ret == 0 && buf.f_type == static_cast<decltype(buf.f_type)>(
1653 BTRFS_SUPER_MAGIC));
1654 #endif
1655 }
1656
1657 PosixDirectory::~PosixDirectory() {
1658 if (fd_ >= 0) {
1659 IOStatus s = PosixDirectory::Close(IOOptions(), nullptr);
1660 s.PermitUncheckedError();
1661 }
1662 }
1663
1664 IOStatus PosixDirectory::Fsync(const IOOptions& opts, IODebugContext* dbg) {
1665 return FsyncWithDirOptions(opts, dbg, DirFsyncOptions());
1666 }
1667
1668 // Users who want the file entries synced in Directory project must call a
1669 // Fsync or FsyncWithDirOptions function before Close
1670 IOStatus PosixDirectory::Close(const IOOptions& /*opts*/,
1671 IODebugContext* /*dbg*/) {
1672 IOStatus s = IOStatus::OK();
1673 if (close(fd_) < 0) {
1674 s = IOError("While closing directory ", directory_name_, errno);
1675 } else {
1676 fd_ = -1;
1677 }
1678 return s;
1679 }
1680
1681 IOStatus PosixDirectory::FsyncWithDirOptions(
1682 const IOOptions& /*opts*/, IODebugContext* /*dbg*/,
1683 const DirFsyncOptions& dir_fsync_options) {
1684 assert(fd_ >= 0); // Check use after close
1685 IOStatus s = IOStatus::OK();
1686 #ifndef OS_AIX
1687 if (is_btrfs_) {
1688 // skip dir fsync for new file creation, which is not needed for btrfs
1689 if (dir_fsync_options.reason == DirFsyncOptions::kNewFileSynced) {
1690 return s;
1691 }
1692 // skip dir fsync for renaming file, only need to sync new file
1693 if (dir_fsync_options.reason == DirFsyncOptions::kFileRenamed) {
1694 std::string new_name = dir_fsync_options.renamed_new_name;
1695 assert(!new_name.empty());
1696 int fd;
1697 do {
1698 IOSTATS_TIMER_GUARD(open_nanos);
1699 fd = open(new_name.c_str(), O_RDONLY);
1700 } while (fd < 0 && errno == EINTR);
1701 if (fd < 0) {
1702 s = IOError("While open renaming file", new_name, errno);
1703 } else if (fsync(fd) < 0) {
1704 s = IOError("While fsync renaming file", new_name, errno);
1705 }
1706 if (close(fd) < 0) {
1707 s = IOError("While closing file after fsync", new_name, errno);
1708 }
1709 return s;
1710 }
1711 // fallback to dir-fsync for kDefault, kDirRenamed and kFileDeleted
1712 }
1713
1714 // skip fsync/fcntl when fd_ == -1 since this file descriptor has been closed
1715 // in either the de-construction or the close function, data must have been
1716 // fsync-ed before de-construction and close is called
1717 #ifdef HAVE_FULLFSYNC
1718 // btrfs is a Linux file system, while currently F_FULLFSYNC is available on
1719 // Mac OS.
1720 assert(!is_btrfs_);
1721 if (fd_ != -1 && ::fcntl(fd_, F_FULLFSYNC) < 0) {
1722 return IOError("while fcntl(F_FULLFSYNC)", "a directory", errno);
1723 }
1724 #else // HAVE_FULLFSYNC
1725 if (fd_ != -1 && fsync(fd_) == -1) {
1726 s = IOError("While fsync", "a directory", errno);
1727 }
1728 #endif // HAVE_FULLFSYNC
1729 #endif // OS_AIX
1730 return s;
1731 }
1732 } // namespace ROCKSDB_NAMESPACE
1733 #endif