1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/event_helpers.h"
7
8 #include "rocksdb/convenience.h"
9 #include "rocksdb/listener.h"
10 #include "rocksdb/utilities/customizable_util.h"
11
12 namespace ROCKSDB_NAMESPACE {
13 #ifndef ROCKSDB_LITE
CreateFromString(const ConfigOptions & config_options,const std::string & id,std::shared_ptr<EventListener> * result)14 Status EventListener::CreateFromString(const ConfigOptions& config_options,
15 const std::string& id,
16 std::shared_ptr<EventListener>* result) {
17 return LoadSharedObject<EventListener>(config_options, id, nullptr, result);
18 }
19 #endif // ROCKSDB_LITE
20
21 namespace {
22 template <class T>
SafeDivide(T a,T b)23 inline T SafeDivide(T a, T b) {
24 return b == 0 ? 0 : a / b;
25 }
26 } // namespace
27
AppendCurrentTime(JSONWriter * jwriter)28 void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
29 *jwriter << "time_micros"
30 << std::chrono::duration_cast<std::chrono::microseconds>(
31 std::chrono::system_clock::now().time_since_epoch())
32 .count();
33 }
34
35 #ifndef ROCKSDB_LITE
NotifyTableFileCreationStarted(const std::vector<std::shared_ptr<EventListener>> & listeners,const std::string & db_name,const std::string & cf_name,const std::string & file_path,int job_id,TableFileCreationReason reason)36 void EventHelpers::NotifyTableFileCreationStarted(
37 const std::vector<std::shared_ptr<EventListener>>& listeners,
38 const std::string& db_name, const std::string& cf_name,
39 const std::string& file_path, int job_id, TableFileCreationReason reason) {
40 if (listeners.empty()) {
41 return;
42 }
43 TableFileCreationBriefInfo info;
44 info.db_name = db_name;
45 info.cf_name = cf_name;
46 info.file_path = file_path;
47 info.job_id = job_id;
48 info.reason = reason;
49 for (auto& listener : listeners) {
50 listener->OnTableFileCreationStarted(info);
51 }
52 }
53 #endif // !ROCKSDB_LITE
54
NotifyOnBackgroundError(const std::vector<std::shared_ptr<EventListener>> & listeners,BackgroundErrorReason reason,Status * bg_error,InstrumentedMutex * db_mutex,bool * auto_recovery)55 void EventHelpers::NotifyOnBackgroundError(
56 const std::vector<std::shared_ptr<EventListener>>& listeners,
57 BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
58 bool* auto_recovery) {
59 #ifndef ROCKSDB_LITE
60 if (listeners.empty()) {
61 return;
62 }
63 db_mutex->AssertHeld();
64 // release lock while notifying events
65 db_mutex->Unlock();
66 for (auto& listener : listeners) {
67 listener->OnBackgroundError(reason, bg_error);
68 bg_error->PermitUncheckedError();
69 if (*auto_recovery) {
70 listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
71 }
72 }
73 db_mutex->Lock();
74 #else
75 (void)listeners;
76 (void)reason;
77 (void)bg_error;
78 (void)db_mutex;
79 (void)auto_recovery;
80 #endif // ROCKSDB_LITE
81 }
82
LogAndNotifyTableFileCreationFinished(EventLogger * event_logger,const std::vector<std::shared_ptr<EventListener>> & listeners,const std::string & db_name,const std::string & cf_name,const std::string & file_path,int job_id,const FileDescriptor & fd,uint64_t oldest_blob_file_number,const TableProperties & table_properties,TableFileCreationReason reason,const Status & s,const std::string & file_checksum,const std::string & file_checksum_func_name)83 void EventHelpers::LogAndNotifyTableFileCreationFinished(
84 EventLogger* event_logger,
85 const std::vector<std::shared_ptr<EventListener>>& listeners,
86 const std::string& db_name, const std::string& cf_name,
87 const std::string& file_path, int job_id, const FileDescriptor& fd,
88 uint64_t oldest_blob_file_number, const TableProperties& table_properties,
89 TableFileCreationReason reason, const Status& s,
90 const std::string& file_checksum,
91 const std::string& file_checksum_func_name) {
92 if (s.ok() && event_logger) {
93 JSONWriter jwriter;
94 AppendCurrentTime(&jwriter);
95 jwriter << "cf_name" << cf_name << "job" << job_id << "event"
96 << "table_file_creation"
97 << "file_number" << fd.GetNumber() << "file_size"
98 << fd.GetFileSize() << "file_checksum" << file_checksum
99 << "file_checksum_func_name" << file_checksum_func_name;
100
101 // table_properties
102 {
103 jwriter << "table_properties";
104 jwriter.StartObject();
105
106 // basic properties:
107 jwriter << "data_size" << table_properties.data_size << "index_size"
108 << table_properties.index_size << "index_partitions"
109 << table_properties.index_partitions << "top_level_index_size"
110 << table_properties.top_level_index_size
111 << "index_key_is_user_key"
112 << table_properties.index_key_is_user_key
113 << "index_value_is_delta_encoded"
114 << table_properties.index_value_is_delta_encoded << "filter_size"
115 << table_properties.filter_size << "raw_key_size"
116 << table_properties.raw_key_size << "raw_average_key_size"
117 << SafeDivide(table_properties.raw_key_size,
118 table_properties.num_entries)
119 << "raw_value_size" << table_properties.raw_value_size
120 << "raw_average_value_size"
121 << SafeDivide(table_properties.raw_value_size,
122 table_properties.num_entries)
123 << "num_data_blocks" << table_properties.num_data_blocks
124 << "num_entries" << table_properties.num_entries
125 << "num_filter_entries" << table_properties.num_filter_entries
126 << "num_deletions" << table_properties.num_deletions
127 << "num_merge_operands" << table_properties.num_merge_operands
128 << "num_range_deletions" << table_properties.num_range_deletions
129 << "format_version" << table_properties.format_version
130 << "fixed_key_len" << table_properties.fixed_key_len
131 << "filter_policy" << table_properties.filter_policy_name
132 << "column_family_name" << table_properties.column_family_name
133 << "column_family_id" << table_properties.column_family_id
134 << "comparator" << table_properties.comparator_name
135 << "merge_operator" << table_properties.merge_operator_name
136 << "prefix_extractor_name"
137 << table_properties.prefix_extractor_name << "property_collectors"
138 << table_properties.property_collectors_names << "compression"
139 << table_properties.compression_name << "compression_options"
140 << table_properties.compression_options << "creation_time"
141 << table_properties.creation_time << "oldest_key_time"
142 << table_properties.oldest_key_time << "file_creation_time"
143 << table_properties.file_creation_time
144 << "slow_compression_estimated_data_size"
145 << table_properties.slow_compression_estimated_data_size
146 << "fast_compression_estimated_data_size"
147 << table_properties.fast_compression_estimated_data_size
148 << "db_id" << table_properties.db_id << "db_session_id"
149 << table_properties.db_session_id << "orig_file_number"
150 << table_properties.orig_file_number;
151
152 // user collected properties
153 for (const auto& prop : table_properties.readable_properties) {
154 jwriter << prop.first << prop.second;
155 }
156 jwriter.EndObject();
157 }
158
159 if (oldest_blob_file_number != kInvalidBlobFileNumber) {
160 jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
161 }
162
163 jwriter.EndObject();
164
165 event_logger->Log(jwriter);
166 }
167
168 #ifndef ROCKSDB_LITE
169 if (listeners.empty()) {
170 return;
171 }
172 TableFileCreationInfo info;
173 info.db_name = db_name;
174 info.cf_name = cf_name;
175 info.file_path = file_path;
176 info.file_size = fd.file_size;
177 info.job_id = job_id;
178 info.table_properties = table_properties;
179 info.reason = reason;
180 info.status = s;
181 info.file_checksum = file_checksum;
182 info.file_checksum_func_name = file_checksum_func_name;
183 for (auto& listener : listeners) {
184 listener->OnTableFileCreated(info);
185 }
186 info.status.PermitUncheckedError();
187 #else
188 (void)listeners;
189 (void)db_name;
190 (void)cf_name;
191 (void)file_path;
192 (void)reason;
193 #endif // !ROCKSDB_LITE
194 }
195
LogAndNotifyTableFileDeletion(EventLogger * event_logger,int job_id,uint64_t file_number,const std::string & file_path,const Status & status,const std::string & dbname,const std::vector<std::shared_ptr<EventListener>> & listeners)196 void EventHelpers::LogAndNotifyTableFileDeletion(
197 EventLogger* event_logger, int job_id, uint64_t file_number,
198 const std::string& file_path, const Status& status,
199 const std::string& dbname,
200 const std::vector<std::shared_ptr<EventListener>>& listeners) {
201 JSONWriter jwriter;
202 AppendCurrentTime(&jwriter);
203
204 jwriter << "job" << job_id << "event"
205 << "table_file_deletion"
206 << "file_number" << file_number;
207 if (!status.ok()) {
208 jwriter << "status" << status.ToString();
209 }
210
211 jwriter.EndObject();
212
213 event_logger->Log(jwriter);
214
215 #ifndef ROCKSDB_LITE
216 if (listeners.empty()) {
217 return;
218 }
219 TableFileDeletionInfo info;
220 info.db_name = dbname;
221 info.job_id = job_id;
222 info.file_path = file_path;
223 info.status = status;
224 for (auto& listener : listeners) {
225 listener->OnTableFileDeleted(info);
226 }
227 info.status.PermitUncheckedError();
228 #else
229 (void)file_path;
230 (void)dbname;
231 (void)listeners;
232 #endif // !ROCKSDB_LITE
233 }
234
NotifyOnErrorRecoveryCompleted(const std::vector<std::shared_ptr<EventListener>> & listeners,Status old_bg_error,InstrumentedMutex * db_mutex)235 void EventHelpers::NotifyOnErrorRecoveryCompleted(
236 const std::vector<std::shared_ptr<EventListener>>& listeners,
237 Status old_bg_error, InstrumentedMutex* db_mutex) {
238 #ifndef ROCKSDB_LITE
239 if (!listeners.empty()) {
240 db_mutex->AssertHeld();
241 // release lock while notifying events
242 db_mutex->Unlock();
243 for (auto& listener : listeners) {
244 listener->OnErrorRecoveryCompleted(old_bg_error);
245 }
246 db_mutex->Lock();
247 }
248 old_bg_error.PermitUncheckedError();
249 #else
250 (void)listeners;
251 (void)old_bg_error;
252 (void)db_mutex;
253 #endif // ROCKSDB_LITE
254 }
255
256 #ifndef ROCKSDB_LITE
NotifyBlobFileCreationStarted(const std::vector<std::shared_ptr<EventListener>> & listeners,const std::string & db_name,const std::string & cf_name,const std::string & file_path,int job_id,BlobFileCreationReason creation_reason)257 void EventHelpers::NotifyBlobFileCreationStarted(
258 const std::vector<std::shared_ptr<EventListener>>& listeners,
259 const std::string& db_name, const std::string& cf_name,
260 const std::string& file_path, int job_id,
261 BlobFileCreationReason creation_reason) {
262 if (listeners.empty()) {
263 return;
264 }
265 BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
266 creation_reason);
267 for (const auto& listener : listeners) {
268 listener->OnBlobFileCreationStarted(info);
269 }
270 }
271 #endif // !ROCKSDB_LITE
272
LogAndNotifyBlobFileCreationFinished(EventLogger * event_logger,const std::vector<std::shared_ptr<EventListener>> & listeners,const std::string & db_name,const std::string & cf_name,const std::string & file_path,int job_id,uint64_t file_number,BlobFileCreationReason creation_reason,const Status & s,const std::string & file_checksum,const std::string & file_checksum_func_name,uint64_t total_blob_count,uint64_t total_blob_bytes)273 void EventHelpers::LogAndNotifyBlobFileCreationFinished(
274 EventLogger* event_logger,
275 const std::vector<std::shared_ptr<EventListener>>& listeners,
276 const std::string& db_name, const std::string& cf_name,
277 const std::string& file_path, int job_id, uint64_t file_number,
278 BlobFileCreationReason creation_reason, const Status& s,
279 const std::string& file_checksum,
280 const std::string& file_checksum_func_name, uint64_t total_blob_count,
281 uint64_t total_blob_bytes) {
282 if (s.ok() && event_logger) {
283 JSONWriter jwriter;
284 AppendCurrentTime(&jwriter);
285 jwriter << "cf_name" << cf_name << "job" << job_id << "event"
286 << "blob_file_creation"
287 << "file_number" << file_number << "total_blob_count"
288 << total_blob_count << "total_blob_bytes" << total_blob_bytes
289 << "file_checksum" << file_checksum << "file_checksum_func_name"
290 << file_checksum_func_name << "status" << s.ToString();
291
292 jwriter.EndObject();
293 event_logger->Log(jwriter);
294 }
295
296 #ifndef ROCKSDB_LITE
297 if (listeners.empty()) {
298 return;
299 }
300 BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
301 creation_reason, total_blob_count, total_blob_bytes,
302 s, file_checksum, file_checksum_func_name);
303 for (const auto& listener : listeners) {
304 listener->OnBlobFileCreated(info);
305 }
306 info.status.PermitUncheckedError();
307 #else
308 (void)listeners;
309 (void)db_name;
310 (void)file_path;
311 (void)creation_reason;
312 #endif
313 }
314
LogAndNotifyBlobFileDeletion(EventLogger * event_logger,const std::vector<std::shared_ptr<EventListener>> & listeners,int job_id,uint64_t file_number,const std::string & file_path,const Status & status,const std::string & dbname)315 void EventHelpers::LogAndNotifyBlobFileDeletion(
316 EventLogger* event_logger,
317 const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
318 uint64_t file_number, const std::string& file_path, const Status& status,
319 const std::string& dbname) {
320 if (event_logger) {
321 JSONWriter jwriter;
322 AppendCurrentTime(&jwriter);
323
324 jwriter << "job" << job_id << "event"
325 << "blob_file_deletion"
326 << "file_number" << file_number;
327 if (!status.ok()) {
328 jwriter << "status" << status.ToString();
329 }
330
331 jwriter.EndObject();
332 event_logger->Log(jwriter);
333 }
334 #ifndef ROCKSDB_LITE
335 if (listeners.empty()) {
336 return;
337 }
338 BlobFileDeletionInfo info(dbname, file_path, job_id, status);
339 for (const auto& listener : listeners) {
340 listener->OnBlobFileDeleted(info);
341 }
342 info.status.PermitUncheckedError();
343 #else
344 (void)listeners;
345 (void)dbname;
346 (void)file_path;
347 #endif // !ROCKSDB_LITE
348 }
349
350 } // namespace ROCKSDB_NAMESPACE
351