]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/blob_db/blob_log_writer.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_log_writer.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5#ifndef ROCKSDB_LITE
6
7#include "utilities/blob_db/blob_log_writer.h"
8
9#include <cstdint>
10#include <string>
11
12#include "monitoring/statistics.h"
13#include "rocksdb/env.h"
14#include "util/coding.h"
15#include "util/file_reader_writer.h"
16#include "util/stop_watch.h"
17#include "utilities/blob_db/blob_log_format.h"
18
19namespace rocksdb {
20namespace blob_db {
21
494da23a 22Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
11fdf7f2
TL
23 Statistics* statistics, uint64_t log_number, uint64_t bpsync,
24 bool use_fs, uint64_t boffset)
25 : dest_(std::move(dest)),
26 env_(env),
27 statistics_(statistics),
28 log_number_(log_number),
29 block_offset_(boffset),
30 bytes_per_sync_(bpsync),
31 next_sync_offset_(0),
32 use_fsync_(use_fs),
33 last_elem_type_(kEtNone) {}
34
35Status Writer::Sync() {
36 StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
37 Status s = dest_->Sync(use_fsync_);
38 RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
39 return s;
40}
41
42Status Writer::WriteHeader(BlobLogHeader& header) {
43 assert(block_offset_ == 0);
44 assert(last_elem_type_ == kEtNone);
45 std::string str;
46 header.EncodeTo(&str);
47
48 Status s = dest_->Append(Slice(str));
49 if (s.ok()) {
50 block_offset_ += str.size();
51 s = dest_->Flush();
52 }
53 last_elem_type_ = kEtFileHdr;
54 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
55 BlobLogHeader::kSize);
56 return s;
57}
58
59Status Writer::AppendFooter(BlobLogFooter& footer) {
60 assert(block_offset_ != 0);
61 assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
62
63 std::string str;
64 footer.EncodeTo(&str);
65
66 Status s = dest_->Append(Slice(str));
67 if (s.ok()) {
68 block_offset_ += str.size();
69 s = dest_->Close();
70 dest_.reset();
71 }
72
73 last_elem_type_ = kEtFileFooter;
74 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
75 BlobLogFooter::kSize);
76 return s;
77}
78
79Status Writer::AddRecord(const Slice& key, const Slice& val,
80 uint64_t expiration, uint64_t* key_offset,
81 uint64_t* blob_offset) {
82 assert(block_offset_ != 0);
83 assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
84
85 std::string buf;
86 ConstructBlobHeader(&buf, key, val, expiration);
87
88 Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
89 return s;
90}
91
92Status Writer::AddRecord(const Slice& key, const Slice& val,
93 uint64_t* key_offset, uint64_t* blob_offset) {
94 assert(block_offset_ != 0);
95 assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
96
97 std::string buf;
98 ConstructBlobHeader(&buf, key, val, 0);
99
100 Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
101 return s;
102}
103
104void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
105 const Slice& val, uint64_t expiration) {
106 BlobLogRecord record;
107 record.key = key;
108 record.value = val;
109 record.expiration = expiration;
110 record.EncodeHeaderTo(buf);
111}
112
113Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
114 const Slice& key, const Slice& val,
115 uint64_t* key_offset, uint64_t* blob_offset) {
116 StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
117 Status s = dest_->Append(Slice(headerbuf));
118 if (s.ok()) {
119 s = dest_->Append(key);
120 }
121 if (s.ok()) {
122 s = dest_->Append(val);
123 }
124 if (s.ok()) {
125 s = dest_->Flush();
126 }
127
128 *key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
129 *blob_offset = *key_offset + key.size();
130 block_offset_ = *blob_offset + val.size();
131 last_elem_type_ = kEtRecord;
132 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
133 BlobLogRecord::kHeaderSize + key.size() + val.size());
134 return s;
135}
136
137} // namespace blob_db
138} // namespace rocksdb
139#endif // ROCKSDB_LITE