1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include "utilities/blob_db/blob_log_writer.h"
12 #include "file/writable_file_writer.h"
13 #include "monitoring/statistics.h"
14 #include "rocksdb/env.h"
15 #include "util/coding.h"
16 #include "util/stop_watch.h"
17 #include "utilities/blob_db/blob_log_format.h"
19 namespace ROCKSDB_NAMESPACE
{
22 Writer::Writer(std::unique_ptr
<WritableFileWriter
>&& dest
, Env
* env
,
23 Statistics
* statistics
, uint64_t log_number
, uint64_t bpsync
,
24 bool use_fs
, uint64_t boffset
)
25 : dest_(std::move(dest
)),
27 statistics_(statistics
),
28 log_number_(log_number
),
29 block_offset_(boffset
),
30 bytes_per_sync_(bpsync
),
33 last_elem_type_(kEtNone
) {}
35 Status
Writer::Sync() {
36 StopWatch
sync_sw(env_
, statistics_
, BLOB_DB_BLOB_FILE_SYNC_MICROS
);
37 Status s
= dest_
->Sync(use_fsync_
);
38 RecordTick(statistics_
, BLOB_DB_BLOB_FILE_SYNCED
);
42 Status
Writer::WriteHeader(BlobLogHeader
& header
) {
43 assert(block_offset_
== 0);
44 assert(last_elem_type_
== kEtNone
);
46 header
.EncodeTo(&str
);
48 Status s
= dest_
->Append(Slice(str
));
50 block_offset_
+= str
.size();
53 last_elem_type_
= kEtFileHdr
;
54 RecordTick(statistics_
, BLOB_DB_BLOB_FILE_BYTES_WRITTEN
,
55 BlobLogHeader::kSize
);
59 Status
Writer::AppendFooter(BlobLogFooter
& footer
) {
60 assert(block_offset_
!= 0);
61 assert(last_elem_type_
== kEtFileHdr
|| last_elem_type_
== kEtRecord
);
64 footer
.EncodeTo(&str
);
66 Status s
= dest_
->Append(Slice(str
));
68 block_offset_
+= str
.size();
73 last_elem_type_
= kEtFileFooter
;
74 RecordTick(statistics_
, BLOB_DB_BLOB_FILE_BYTES_WRITTEN
,
75 BlobLogFooter::kSize
);
79 Status
Writer::AddRecord(const Slice
& key
, const Slice
& val
,
80 uint64_t expiration
, uint64_t* key_offset
,
81 uint64_t* blob_offset
) {
82 assert(block_offset_
!= 0);
83 assert(last_elem_type_
== kEtFileHdr
|| last_elem_type_
== kEtRecord
);
86 ConstructBlobHeader(&buf
, key
, val
, expiration
);
88 Status s
= EmitPhysicalRecord(buf
, key
, val
, key_offset
, blob_offset
);
92 Status
Writer::AddRecord(const Slice
& key
, const Slice
& val
,
93 uint64_t* key_offset
, uint64_t* blob_offset
) {
94 assert(block_offset_
!= 0);
95 assert(last_elem_type_
== kEtFileHdr
|| last_elem_type_
== kEtRecord
);
98 ConstructBlobHeader(&buf
, key
, val
, 0);
100 Status s
= EmitPhysicalRecord(buf
, key
, val
, key_offset
, blob_offset
);
104 void Writer::ConstructBlobHeader(std::string
* buf
, const Slice
& key
,
105 const Slice
& val
, uint64_t expiration
) {
106 BlobLogRecord record
;
109 record
.expiration
= expiration
;
110 record
.EncodeHeaderTo(buf
);
113 Status
Writer::EmitPhysicalRecord(const std::string
& headerbuf
,
114 const Slice
& key
, const Slice
& val
,
115 uint64_t* key_offset
, uint64_t* blob_offset
) {
116 StopWatch
write_sw(env_
, statistics_
, BLOB_DB_BLOB_FILE_WRITE_MICROS
);
117 Status s
= dest_
->Append(Slice(headerbuf
));
119 s
= dest_
->Append(key
);
122 s
= dest_
->Append(val
);
128 *key_offset
= block_offset_
+ BlobLogRecord::kHeaderSize
;
129 *blob_offset
= *key_offset
+ key
.size();
130 block_offset_
= *blob_offset
+ val
.size();
131 last_elem_type_
= kEtRecord
;
132 RecordTick(statistics_
, BLOB_DB_BLOB_FILE_BYTES_WRITTEN
,
133 BlobLogRecord::kHeaderSize
+ key
.size() + val
.size());
137 } // namespace blob_db
138 } // namespace ROCKSDB_NAMESPACE
139 #endif // ROCKSDB_LITE