]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/env/composite_env.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / env / composite_env.cc
CommitLineData
1e59de90
TL
1// Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6#include "env/composite_env_wrapper.h"
7#include "rocksdb/utilities/options_type.h"
8#include "util/string_util.h"
9
10namespace ROCKSDB_NAMESPACE {
11namespace {
12// The CompositeEnvWrapper class provides an interface that is compatible
13// with the old monolithic Env API, and an implementation that wraps around
14// the new Env that provides threading and other OS related functionality, and
15// the new FileSystem API that provides storage functionality. By
16// providing the old Env interface, it allows the rest of RocksDB code to
17// be agnostic of whether the underlying Env implementation is a monolithic
18// Env or an Env + FileSystem. In the former case, the user will specify
19// Options::env only, whereas in the latter case, the user will specify
20// Options::env and Options::file_system.
21
22class CompositeSequentialFileWrapper : public SequentialFile {
23 public:
24 explicit CompositeSequentialFileWrapper(
25 std::unique_ptr<FSSequentialFile>& target)
26 : target_(std::move(target)) {}
27
28 Status Read(size_t n, Slice* result, char* scratch) override {
29 IOOptions io_opts;
30 IODebugContext dbg;
31 return target_->Read(n, io_opts, result, scratch, &dbg);
32 }
33 Status Skip(uint64_t n) override { return target_->Skip(n); }
34 bool use_direct_io() const override { return target_->use_direct_io(); }
35 size_t GetRequiredBufferAlignment() const override {
36 return target_->GetRequiredBufferAlignment();
37 }
38 Status InvalidateCache(size_t offset, size_t length) override {
39 return target_->InvalidateCache(offset, length);
40 }
41 Status PositionedRead(uint64_t offset, size_t n, Slice* result,
42 char* scratch) override {
43 IOOptions io_opts;
44 IODebugContext dbg;
45 return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg);
46 }
47
48 private:
49 std::unique_ptr<FSSequentialFile> target_;
50};
51
52class CompositeRandomAccessFileWrapper : public RandomAccessFile {
53 public:
54 explicit CompositeRandomAccessFileWrapper(
55 std::unique_ptr<FSRandomAccessFile>& target)
56 : target_(std::move(target)) {}
57
58 Status Read(uint64_t offset, size_t n, Slice* result,
59 char* scratch) const override {
60 IOOptions io_opts;
61 IODebugContext dbg;
62 return target_->Read(offset, n, io_opts, result, scratch, &dbg);
63 }
64 Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
65 IOOptions io_opts;
66 IODebugContext dbg;
67 std::vector<FSReadRequest> fs_reqs;
68 Status status;
69
70 fs_reqs.resize(num_reqs);
71 for (size_t i = 0; i < num_reqs; ++i) {
72 fs_reqs[i].offset = reqs[i].offset;
73 fs_reqs[i].len = reqs[i].len;
74 fs_reqs[i].scratch = reqs[i].scratch;
75 fs_reqs[i].status = IOStatus::OK();
76 }
77 status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg);
78 for (size_t i = 0; i < num_reqs; ++i) {
79 reqs[i].result = fs_reqs[i].result;
80 reqs[i].status = fs_reqs[i].status;
81 }
82 return status;
83 }
84 Status Prefetch(uint64_t offset, size_t n) override {
85 IOOptions io_opts;
86 IODebugContext dbg;
87 return target_->Prefetch(offset, n, io_opts, &dbg);
88 }
89 size_t GetUniqueId(char* id, size_t max_size) const override {
90 return target_->GetUniqueId(id, max_size);
91 }
92 void Hint(AccessPattern pattern) override {
93 target_->Hint((FSRandomAccessFile::AccessPattern)pattern);
94 }
95 bool use_direct_io() const override { return target_->use_direct_io(); }
96 size_t GetRequiredBufferAlignment() const override {
97 return target_->GetRequiredBufferAlignment();
98 }
99 Status InvalidateCache(size_t offset, size_t length) override {
100 return target_->InvalidateCache(offset, length);
101 }
102
103 private:
104 std::unique_ptr<FSRandomAccessFile> target_;
105};
106
107class CompositeWritableFileWrapper : public WritableFile {
108 public:
109 explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t)
110 : target_(std::move(t)) {}
111
112 Status Append(const Slice& data) override {
113 IOOptions io_opts;
114 IODebugContext dbg;
115 return target_->Append(data, io_opts, &dbg);
116 }
117 Status Append(const Slice& data,
118 const DataVerificationInfo& verification_info) override {
119 IOOptions io_opts;
120 IODebugContext dbg;
121 return target_->Append(data, io_opts, verification_info, &dbg);
122 }
123 Status PositionedAppend(const Slice& data, uint64_t offset) override {
124 IOOptions io_opts;
125 IODebugContext dbg;
126 return target_->PositionedAppend(data, offset, io_opts, &dbg);
127 }
128 Status PositionedAppend(
129 const Slice& data, uint64_t offset,
130 const DataVerificationInfo& verification_info) override {
131 IOOptions io_opts;
132 IODebugContext dbg;
133 return target_->PositionedAppend(data, offset, io_opts, verification_info,
134 &dbg);
135 }
136 Status Truncate(uint64_t size) override {
137 IOOptions io_opts;
138 IODebugContext dbg;
139 return target_->Truncate(size, io_opts, &dbg);
140 }
141 Status Close() override {
142 IOOptions io_opts;
143 IODebugContext dbg;
144 return target_->Close(io_opts, &dbg);
145 }
146 Status Flush() override {
147 IOOptions io_opts;
148 IODebugContext dbg;
149 return target_->Flush(io_opts, &dbg);
150 }
151 Status Sync() override {
152 IOOptions io_opts;
153 IODebugContext dbg;
154 return target_->Sync(io_opts, &dbg);
155 }
156 Status Fsync() override {
157 IOOptions io_opts;
158 IODebugContext dbg;
159 return target_->Fsync(io_opts, &dbg);
160 }
161 bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
162
163 bool use_direct_io() const override { return target_->use_direct_io(); }
164
165 size_t GetRequiredBufferAlignment() const override {
166 return target_->GetRequiredBufferAlignment();
167 }
168
169 void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
170 target_->SetWriteLifeTimeHint(hint);
171 }
172
173 Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
174 return target_->GetWriteLifeTimeHint();
175 }
176
177 uint64_t GetFileSize() override {
178 IOOptions io_opts;
179 IODebugContext dbg;
180 return target_->GetFileSize(io_opts, &dbg);
181 }
182
183 void SetPreallocationBlockSize(size_t size) override {
184 target_->SetPreallocationBlockSize(size);
185 }
186
187 void GetPreallocationStatus(size_t* block_size,
188 size_t* last_allocated_block) override {
189 target_->GetPreallocationStatus(block_size, last_allocated_block);
190 }
191
192 size_t GetUniqueId(char* id, size_t max_size) const override {
193 return target_->GetUniqueId(id, max_size);
194 }
195
196 Status InvalidateCache(size_t offset, size_t length) override {
197 return target_->InvalidateCache(offset, length);
198 }
199
200 Status RangeSync(uint64_t offset, uint64_t nbytes) override {
201 IOOptions io_opts;
202 IODebugContext dbg;
203 return target_->RangeSync(offset, nbytes, io_opts, &dbg);
204 }
205
206 void PrepareWrite(size_t offset, size_t len) override {
207 IOOptions io_opts;
208 IODebugContext dbg;
209 target_->PrepareWrite(offset, len, io_opts, &dbg);
210 }
211
212 Status Allocate(uint64_t offset, uint64_t len) override {
213 IOOptions io_opts;
214 IODebugContext dbg;
215 return target_->Allocate(offset, len, io_opts, &dbg);
216 }
217
218 std::unique_ptr<FSWritableFile>* target() { return &target_; }
219
220 private:
221 std::unique_ptr<FSWritableFile> target_;
222};
223
224class CompositeRandomRWFileWrapper : public RandomRWFile {
225 public:
226 explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target)
227 : target_(std::move(target)) {}
228
229 bool use_direct_io() const override { return target_->use_direct_io(); }
230 size_t GetRequiredBufferAlignment() const override {
231 return target_->GetRequiredBufferAlignment();
232 }
233 Status Write(uint64_t offset, const Slice& data) override {
234 IOOptions io_opts;
235 IODebugContext dbg;
236 return target_->Write(offset, data, io_opts, &dbg);
237 }
238 Status Read(uint64_t offset, size_t n, Slice* result,
239 char* scratch) const override {
240 IOOptions io_opts;
241 IODebugContext dbg;
242 return target_->Read(offset, n, io_opts, result, scratch, &dbg);
243 }
244 Status Flush() override {
245 IOOptions io_opts;
246 IODebugContext dbg;
247 return target_->Flush(io_opts, &dbg);
248 }
249 Status Sync() override {
250 IOOptions io_opts;
251 IODebugContext dbg;
252 return target_->Sync(io_opts, &dbg);
253 }
254 Status Fsync() override {
255 IOOptions io_opts;
256 IODebugContext dbg;
257 return target_->Fsync(io_opts, &dbg);
258 }
259 Status Close() override {
260 IOOptions io_opts;
261 IODebugContext dbg;
262 return target_->Close(io_opts, &dbg);
263 }
264
265 private:
266 std::unique_ptr<FSRandomRWFile> target_;
267};
268
269class CompositeDirectoryWrapper : public Directory {
270 public:
271 explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target)
272 : target_(std::move(target)) {}
273
274 Status Fsync() override {
275 IOOptions io_opts;
276 IODebugContext dbg;
277 return target_->FsyncWithDirOptions(io_opts, &dbg, DirFsyncOptions());
278 }
279
280 Status Close() override {
281 IOOptions io_opts;
282 IODebugContext dbg;
283 return target_->Close(io_opts, &dbg);
284 }
285
286 size_t GetUniqueId(char* id, size_t max_size) const override {
287 return target_->GetUniqueId(id, max_size);
288 }
289
290 private:
291 std::unique_ptr<FSDirectory> target_;
292};
293} // namespace
294
295Status CompositeEnv::NewSequentialFile(const std::string& f,
296 std::unique_ptr<SequentialFile>* r,
297 const EnvOptions& options) {
298 IODebugContext dbg;
299 std::unique_ptr<FSSequentialFile> file;
300 Status status;
301 status =
302 file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
303 if (status.ok()) {
304 r->reset(new CompositeSequentialFileWrapper(file));
305 }
306 return status;
307}
308
309Status CompositeEnv::NewRandomAccessFile(const std::string& f,
310 std::unique_ptr<RandomAccessFile>* r,
311 const EnvOptions& options) {
312 IODebugContext dbg;
313 std::unique_ptr<FSRandomAccessFile> file;
314 Status status;
315 status =
316 file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
317 if (status.ok()) {
318 r->reset(new CompositeRandomAccessFileWrapper(file));
319 }
320 return status;
321}
322
323Status CompositeEnv::NewWritableFile(const std::string& f,
324 std::unique_ptr<WritableFile>* r,
325 const EnvOptions& options) {
326 IODebugContext dbg;
327 std::unique_ptr<FSWritableFile> file;
328 Status status;
329 status = file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
330 if (status.ok()) {
331 r->reset(new CompositeWritableFileWrapper(file));
332 }
333 return status;
334}
335
336Status CompositeEnv::ReopenWritableFile(const std::string& fname,
337 std::unique_ptr<WritableFile>* result,
338 const EnvOptions& options) {
339 IODebugContext dbg;
340 Status status;
341 std::unique_ptr<FSWritableFile> file;
342 status = file_system_->ReopenWritableFile(fname, FileOptions(options), &file,
343 &dbg);
344 if (status.ok()) {
345 result->reset(new CompositeWritableFileWrapper(file));
346 }
347 return status;
348}
349
350Status CompositeEnv::ReuseWritableFile(const std::string& fname,
351 const std::string& old_fname,
352 std::unique_ptr<WritableFile>* r,
353 const EnvOptions& options) {
354 IODebugContext dbg;
355 Status status;
356 std::unique_ptr<FSWritableFile> file;
357 status = file_system_->ReuseWritableFile(fname, old_fname,
358 FileOptions(options), &file, &dbg);
359 if (status.ok()) {
360 r->reset(new CompositeWritableFileWrapper(file));
361 }
362 return status;
363}
364
365Status CompositeEnv::NewRandomRWFile(const std::string& fname,
366 std::unique_ptr<RandomRWFile>* result,
367 const EnvOptions& options) {
368 IODebugContext dbg;
369 std::unique_ptr<FSRandomRWFile> file;
370 Status status;
371 status =
372 file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
373 if (status.ok()) {
374 result->reset(new CompositeRandomRWFileWrapper(file));
375 }
376 return status;
377}
378
379Status CompositeEnv::NewDirectory(const std::string& name,
380 std::unique_ptr<Directory>* result) {
381 IOOptions io_opts;
382 IODebugContext dbg;
383 std::unique_ptr<FSDirectory> dir;
384 Status status;
385 status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
386 if (status.ok()) {
387 result->reset(new CompositeDirectoryWrapper(dir));
388 }
389 return status;
390}
391
392namespace {
393static std::unordered_map<std::string, OptionTypeInfo> env_wrapper_type_info = {
394#ifndef ROCKSDB_LITE
395 {"target",
396 OptionTypeInfo(0, OptionType::kUnknown, OptionVerificationType::kByName,
397 OptionTypeFlags::kDontSerialize)
398 .SetParseFunc([](const ConfigOptions& opts,
399 const std::string& /*name*/, const std::string& value,
400 void* addr) {
401 auto target = static_cast<EnvWrapper::Target*>(addr);
402 return Env::CreateFromString(opts, value, &(target->env),
403 &(target->guard));
404 })
405 .SetEqualsFunc([](const ConfigOptions& opts,
406 const std::string& /*name*/, const void* addr1,
407 const void* addr2, std::string* mismatch) {
408 const auto target1 = static_cast<const EnvWrapper::Target*>(addr1);
409 const auto target2 = static_cast<const EnvWrapper::Target*>(addr2);
410 if (target1->env != nullptr) {
411 return target1->env->AreEquivalent(opts, target2->env, mismatch);
412 } else {
413 return (target2->env == nullptr);
414 }
415 })
416 .SetPrepareFunc([](const ConfigOptions& opts,
417 const std::string& /*name*/, void* addr) {
418 auto target = static_cast<EnvWrapper::Target*>(addr);
419 if (target->guard.get() != nullptr) {
420 target->env = target->guard.get();
421 } else if (target->env == nullptr) {
422 target->env = Env::Default();
423 }
424 return target->env->PrepareOptions(opts);
425 })
426 .SetValidateFunc([](const DBOptions& db_opts,
427 const ColumnFamilyOptions& cf_opts,
428 const std::string& /*name*/, const void* addr) {
429 const auto target = static_cast<const EnvWrapper::Target*>(addr);
430 if (target->env == nullptr) {
431 return Status::InvalidArgument("Target Env not specified");
432 } else {
433 return target->env->ValidateOptions(db_opts, cf_opts);
434 }
435 })},
436#endif // ROCKSDB_LITE
437};
438static std::unordered_map<std::string, OptionTypeInfo>
439 composite_fs_wrapper_type_info = {
440#ifndef ROCKSDB_LITE
441 {"file_system",
442 OptionTypeInfo::AsCustomSharedPtr<FileSystem>(
443 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
444#endif // ROCKSDB_LITE
445};
446
447static std::unordered_map<std::string, OptionTypeInfo>
448 composite_clock_wrapper_type_info = {
449#ifndef ROCKSDB_LITE
450 {"clock",
451 OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
452 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
453#endif // ROCKSDB_LITE
454};
455
456} // namespace
457
458std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
459 return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
460}
461
462CompositeEnvWrapper::CompositeEnvWrapper(Env* env,
463 const std::shared_ptr<FileSystem>& fs,
464 const std::shared_ptr<SystemClock>& sc)
465 : CompositeEnv(fs, sc), target_(env) {
466 RegisterOptions("", &target_, &env_wrapper_type_info);
467 RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
468 RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
469}
470
471CompositeEnvWrapper::CompositeEnvWrapper(const std::shared_ptr<Env>& env,
472 const std::shared_ptr<FileSystem>& fs,
473 const std::shared_ptr<SystemClock>& sc)
474 : CompositeEnv(fs, sc), target_(env) {
475 RegisterOptions("", &target_, &env_wrapper_type_info);
476 RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
477 RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
478}
479
480Status CompositeEnvWrapper::PrepareOptions(const ConfigOptions& options) {
481 target_.Prepare();
482 if (file_system_ == nullptr) {
483 file_system_ = target_.env->GetFileSystem();
484 }
485 if (system_clock_ == nullptr) {
486 system_clock_ = target_.env->GetSystemClock();
487 }
488 return Env::PrepareOptions(options);
489}
490
491#ifndef ROCKSDB_LITE
492std::string CompositeEnvWrapper::SerializeOptions(
493 const ConfigOptions& config_options, const std::string& header) const {
494 auto options = CompositeEnv::SerializeOptions(config_options, header);
495 if (target_.env != nullptr && target_.env != Env::Default()) {
496 options.append("target=");
497 options.append(target_.env->ToString(config_options));
498 }
499 return options;
500}
501#endif // ROCKSDB_LITE
502
503EnvWrapper::EnvWrapper(Env* t) : target_(t) {
504 RegisterOptions("", &target_, &env_wrapper_type_info);
505}
506
507EnvWrapper::EnvWrapper(std::unique_ptr<Env>&& t) : target_(std::move(t)) {
508 RegisterOptions("", &target_, &env_wrapper_type_info);
509}
510
511EnvWrapper::EnvWrapper(const std::shared_ptr<Env>& t) : target_(t) {
512 RegisterOptions("", &target_, &env_wrapper_type_info);
513}
514
515EnvWrapper::~EnvWrapper() {}
516
517Status EnvWrapper::PrepareOptions(const ConfigOptions& options) {
518 target_.Prepare();
519 return Env::PrepareOptions(options);
520}
521
522#ifndef ROCKSDB_LITE
523std::string EnvWrapper::SerializeOptions(const ConfigOptions& config_options,
524 const std::string& header) const {
525 auto parent = Env::SerializeOptions(config_options, "");
526 if (config_options.IsShallow() || target_.env == nullptr ||
527 target_.env == Env::Default()) {
528 return parent;
529 } else {
530 std::string result = header;
531 if (!StartsWith(parent, OptionTypeInfo::kIdPropName())) {
532 result.append(OptionTypeInfo::kIdPropName()).append("=");
533 }
534 result.append(parent);
535 if (!EndsWith(result, config_options.delimiter)) {
536 result.append(config_options.delimiter);
537 }
538 result.append("target=").append(target_.env->ToString(config_options));
539 return result;
540 }
541}
542#endif // ROCKSDB_LITE
543
544} // namespace ROCKSDB_NAMESPACE