]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | #ifndef ROCKSDB_LITE | |
6 | ||
7 | #include "utilities/blob_db/blob_log_writer.h" | |
8 | ||
9 | #include <cstdint> | |
10 | #include <string> | |
11 | ||
12 | #include "monitoring/statistics.h" | |
13 | #include "rocksdb/env.h" | |
14 | #include "util/coding.h" | |
15 | #include "util/file_reader_writer.h" | |
16 | #include "util/stop_watch.h" | |
17 | #include "utilities/blob_db/blob_log_format.h" | |
18 | ||
19 | namespace rocksdb { | |
20 | namespace blob_db { | |
21 | ||
494da23a | 22 | Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env, |
11fdf7f2 TL |
23 | Statistics* statistics, uint64_t log_number, uint64_t bpsync, |
24 | bool use_fs, uint64_t boffset) | |
25 | : dest_(std::move(dest)), | |
26 | env_(env), | |
27 | statistics_(statistics), | |
28 | log_number_(log_number), | |
29 | block_offset_(boffset), | |
30 | bytes_per_sync_(bpsync), | |
31 | next_sync_offset_(0), | |
32 | use_fsync_(use_fs), | |
33 | last_elem_type_(kEtNone) {} | |
34 | ||
35 | Status Writer::Sync() { | |
36 | StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS); | |
37 | Status s = dest_->Sync(use_fsync_); | |
38 | RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED); | |
39 | return s; | |
40 | } | |
41 | ||
42 | Status Writer::WriteHeader(BlobLogHeader& header) { | |
43 | assert(block_offset_ == 0); | |
44 | assert(last_elem_type_ == kEtNone); | |
45 | std::string str; | |
46 | header.EncodeTo(&str); | |
47 | ||
48 | Status s = dest_->Append(Slice(str)); | |
49 | if (s.ok()) { | |
50 | block_offset_ += str.size(); | |
51 | s = dest_->Flush(); | |
52 | } | |
53 | last_elem_type_ = kEtFileHdr; | |
54 | RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, | |
55 | BlobLogHeader::kSize); | |
56 | return s; | |
57 | } | |
58 | ||
59 | Status Writer::AppendFooter(BlobLogFooter& footer) { | |
60 | assert(block_offset_ != 0); | |
61 | assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); | |
62 | ||
63 | std::string str; | |
64 | footer.EncodeTo(&str); | |
65 | ||
66 | Status s = dest_->Append(Slice(str)); | |
67 | if (s.ok()) { | |
68 | block_offset_ += str.size(); | |
69 | s = dest_->Close(); | |
70 | dest_.reset(); | |
71 | } | |
72 | ||
73 | last_elem_type_ = kEtFileFooter; | |
74 | RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, | |
75 | BlobLogFooter::kSize); | |
76 | return s; | |
77 | } | |
78 | ||
79 | Status Writer::AddRecord(const Slice& key, const Slice& val, | |
80 | uint64_t expiration, uint64_t* key_offset, | |
81 | uint64_t* blob_offset) { | |
82 | assert(block_offset_ != 0); | |
83 | assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); | |
84 | ||
85 | std::string buf; | |
86 | ConstructBlobHeader(&buf, key, val, expiration); | |
87 | ||
88 | Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset); | |
89 | return s; | |
90 | } | |
91 | ||
92 | Status Writer::AddRecord(const Slice& key, const Slice& val, | |
93 | uint64_t* key_offset, uint64_t* blob_offset) { | |
94 | assert(block_offset_ != 0); | |
95 | assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); | |
96 | ||
97 | std::string buf; | |
98 | ConstructBlobHeader(&buf, key, val, 0); | |
99 | ||
100 | Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset); | |
101 | return s; | |
102 | } | |
103 | ||
104 | void Writer::ConstructBlobHeader(std::string* buf, const Slice& key, | |
105 | const Slice& val, uint64_t expiration) { | |
106 | BlobLogRecord record; | |
107 | record.key = key; | |
108 | record.value = val; | |
109 | record.expiration = expiration; | |
110 | record.EncodeHeaderTo(buf); | |
111 | } | |
112 | ||
113 | Status Writer::EmitPhysicalRecord(const std::string& headerbuf, | |
114 | const Slice& key, const Slice& val, | |
115 | uint64_t* key_offset, uint64_t* blob_offset) { | |
116 | StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS); | |
117 | Status s = dest_->Append(Slice(headerbuf)); | |
118 | if (s.ok()) { | |
119 | s = dest_->Append(key); | |
120 | } | |
121 | if (s.ok()) { | |
122 | s = dest_->Append(val); | |
123 | } | |
124 | if (s.ok()) { | |
125 | s = dest_->Flush(); | |
126 | } | |
127 | ||
128 | *key_offset = block_offset_ + BlobLogRecord::kHeaderSize; | |
129 | *blob_offset = *key_offset + key.size(); | |
130 | block_offset_ = *blob_offset + val.size(); | |
131 | last_elem_type_ = kEtRecord; | |
132 | RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, | |
133 | BlobLogRecord::kHeaderSize + key.size() + val.size()); | |
134 | return s; | |
135 | } | |
136 | ||
137 | } // namespace blob_db | |
138 | } // namespace rocksdb | |
139 | #endif // ROCKSDB_LITE |