1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #ifndef ROCKSDB_LITE
6 
7 #include "utilities/blob_db/blob_log_writer.h"
8 
9 #include <cstdint>
10 #include <string>
11 
12 #include "file/writable_file_writer.h"
13 #include "monitoring/statistics.h"
14 #include "rocksdb/env.h"
15 #include "util/coding.h"
16 #include "util/stop_watch.h"
17 #include "utilities/blob_db/blob_log_format.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 namespace blob_db {
21 
Writer(std::unique_ptr<WritableFileWriter> && dest,Env * env,Statistics * statistics,uint64_t log_number,uint64_t bpsync,bool use_fs,uint64_t boffset)22 Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
23                Statistics* statistics, uint64_t log_number, uint64_t bpsync,
24                bool use_fs, uint64_t boffset)
25     : dest_(std::move(dest)),
26       env_(env),
27       statistics_(statistics),
28       log_number_(log_number),
29       block_offset_(boffset),
30       bytes_per_sync_(bpsync),
31       next_sync_offset_(0),
32       use_fsync_(use_fs),
33       last_elem_type_(kEtNone) {}
34 
Sync()35 Status Writer::Sync() {
36   StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
37   Status s = dest_->Sync(use_fsync_);
38   RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
39   return s;
40 }
41 
WriteHeader(BlobLogHeader & header)42 Status Writer::WriteHeader(BlobLogHeader& header) {
43   assert(block_offset_ == 0);
44   assert(last_elem_type_ == kEtNone);
45   std::string str;
46   header.EncodeTo(&str);
47 
48   Status s = dest_->Append(Slice(str));
49   if (s.ok()) {
50     block_offset_ += str.size();
51     s = dest_->Flush();
52   }
53   last_elem_type_ = kEtFileHdr;
54   RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
55              BlobLogHeader::kSize);
56   return s;
57 }
58 
AppendFooter(BlobLogFooter & footer)59 Status Writer::AppendFooter(BlobLogFooter& footer) {
60   assert(block_offset_ != 0);
61   assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
62 
63   std::string str;
64   footer.EncodeTo(&str);
65 
66   Status s = dest_->Append(Slice(str));
67   if (s.ok()) {
68     block_offset_ += str.size();
69     s = dest_->Close();
70     dest_.reset();
71   }
72 
73   last_elem_type_ = kEtFileFooter;
74   RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
75              BlobLogFooter::kSize);
76   return s;
77 }
78 
AddRecord(const Slice & key,const Slice & val,uint64_t expiration,uint64_t * key_offset,uint64_t * blob_offset)79 Status Writer::AddRecord(const Slice& key, const Slice& val,
80                          uint64_t expiration, uint64_t* key_offset,
81                          uint64_t* blob_offset) {
82   assert(block_offset_ != 0);
83   assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
84 
85   std::string buf;
86   ConstructBlobHeader(&buf, key, val, expiration);
87 
88   Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
89   return s;
90 }
91 
AddRecord(const Slice & key,const Slice & val,uint64_t * key_offset,uint64_t * blob_offset)92 Status Writer::AddRecord(const Slice& key, const Slice& val,
93                          uint64_t* key_offset, uint64_t* blob_offset) {
94   assert(block_offset_ != 0);
95   assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
96 
97   std::string buf;
98   ConstructBlobHeader(&buf, key, val, 0);
99 
100   Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
101   return s;
102 }
103 
ConstructBlobHeader(std::string * buf,const Slice & key,const Slice & val,uint64_t expiration)104 void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
105                                  const Slice& val, uint64_t expiration) {
106   BlobLogRecord record;
107   record.key = key;
108   record.value = val;
109   record.expiration = expiration;
110   record.EncodeHeaderTo(buf);
111 }
112 
EmitPhysicalRecord(const std::string & headerbuf,const Slice & key,const Slice & val,uint64_t * key_offset,uint64_t * blob_offset)113 Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
114                                   const Slice& key, const Slice& val,
115                                   uint64_t* key_offset, uint64_t* blob_offset) {
116   StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
117   Status s = dest_->Append(Slice(headerbuf));
118   if (s.ok()) {
119     s = dest_->Append(key);
120   }
121   if (s.ok()) {
122     s = dest_->Append(val);
123   }
124   if (s.ok()) {
125     s = dest_->Flush();
126   }
127 
128   *key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
129   *blob_offset = *key_offset + key.size();
130   block_offset_ = *blob_offset + val.size();
131   last_elem_type_ = kEtRecord;
132   RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
133              BlobLogRecord::kHeaderSize + key.size() + val.size());
134   return s;
135 }
136 
137 }  // namespace blob_db
138 }  // namespace ROCKSDB_NAMESPACE
139 #endif  // ROCKSDB_LITE
140