]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/blob_db/blob_log_writer.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_log_writer.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #ifndef ROCKSDB_LITE
6
7 #include "utilities/blob_db/blob_log_writer.h"
8
9 #include <cstdint>
10 #include <string>
11
12 #include "file/writable_file_writer.h"
13 #include "monitoring/statistics.h"
14 #include "rocksdb/env.h"
15 #include "util/coding.h"
16 #include "util/stop_watch.h"
17 #include "utilities/blob_db/blob_log_format.h"
18
19 namespace ROCKSDB_NAMESPACE {
20 namespace blob_db {
21
22 Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
23 Statistics* statistics, uint64_t log_number, uint64_t bpsync,
24 bool use_fs, uint64_t boffset)
25 : dest_(std::move(dest)),
26 env_(env),
27 statistics_(statistics),
28 log_number_(log_number),
29 block_offset_(boffset),
30 bytes_per_sync_(bpsync),
31 next_sync_offset_(0),
32 use_fsync_(use_fs),
33 last_elem_type_(kEtNone) {}
34
35 Status Writer::Sync() {
36 StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
37 Status s = dest_->Sync(use_fsync_);
38 RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
39 return s;
40 }
41
42 Status Writer::WriteHeader(BlobLogHeader& header) {
43 assert(block_offset_ == 0);
44 assert(last_elem_type_ == kEtNone);
45 std::string str;
46 header.EncodeTo(&str);
47
48 Status s = dest_->Append(Slice(str));
49 if (s.ok()) {
50 block_offset_ += str.size();
51 s = dest_->Flush();
52 }
53 last_elem_type_ = kEtFileHdr;
54 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
55 BlobLogHeader::kSize);
56 return s;
57 }
58
59 Status Writer::AppendFooter(BlobLogFooter& footer) {
60 assert(block_offset_ != 0);
61 assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
62
63 std::string str;
64 footer.EncodeTo(&str);
65
66 Status s = dest_->Append(Slice(str));
67 if (s.ok()) {
68 block_offset_ += str.size();
69 s = dest_->Close();
70 dest_.reset();
71 }
72
73 last_elem_type_ = kEtFileFooter;
74 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
75 BlobLogFooter::kSize);
76 return s;
77 }
78
79 Status Writer::AddRecord(const Slice& key, const Slice& val,
80 uint64_t expiration, uint64_t* key_offset,
81 uint64_t* blob_offset) {
82 assert(block_offset_ != 0);
83 assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
84
85 std::string buf;
86 ConstructBlobHeader(&buf, key, val, expiration);
87
88 Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
89 return s;
90 }
91
92 Status Writer::AddRecord(const Slice& key, const Slice& val,
93 uint64_t* key_offset, uint64_t* blob_offset) {
94 assert(block_offset_ != 0);
95 assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
96
97 std::string buf;
98 ConstructBlobHeader(&buf, key, val, 0);
99
100 Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
101 return s;
102 }
103
104 void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
105 const Slice& val, uint64_t expiration) {
106 BlobLogRecord record;
107 record.key = key;
108 record.value = val;
109 record.expiration = expiration;
110 record.EncodeHeaderTo(buf);
111 }
112
113 Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
114 const Slice& key, const Slice& val,
115 uint64_t* key_offset, uint64_t* blob_offset) {
116 StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
117 Status s = dest_->Append(Slice(headerbuf));
118 if (s.ok()) {
119 s = dest_->Append(key);
120 }
121 if (s.ok()) {
122 s = dest_->Append(val);
123 }
124 if (s.ok()) {
125 s = dest_->Flush();
126 }
127
128 *key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
129 *blob_offset = *key_offset + key.size();
130 block_offset_ = *blob_offset + val.size();
131 last_elem_type_ = kEtRecord;
132 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
133 BlobLogRecord::kHeaderSize + key.size() + val.size());
134 return s;
135 }
136
137 } // namespace blob_db
138 } // namespace ROCKSDB_NAMESPACE
139 #endif // ROCKSDB_LITE