1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/event_helpers.h"
7 
8 #include "rocksdb/convenience.h"
9 #include "rocksdb/listener.h"
10 #include "rocksdb/utilities/customizable_util.h"
11 
12 namespace ROCKSDB_NAMESPACE {
13 #ifndef ROCKSDB_LITE
CreateFromString(const ConfigOptions & config_options,const std::string & id,std::shared_ptr<EventListener> * result)14 Status EventListener::CreateFromString(const ConfigOptions& config_options,
15                                        const std::string& id,
16                                        std::shared_ptr<EventListener>* result) {
17   return LoadSharedObject<EventListener>(config_options, id, nullptr, result);
18 }
19 #endif  // ROCKSDB_LITE
20 
21 namespace {
22 template <class T>
SafeDivide(T a,T b)23 inline T SafeDivide(T a, T b) {
24   return b == 0 ? 0 : a / b;
25 }
26 }  // namespace
27 
AppendCurrentTime(JSONWriter * jwriter)28 void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
29   *jwriter << "time_micros"
30            << std::chrono::duration_cast<std::chrono::microseconds>(
31                   std::chrono::system_clock::now().time_since_epoch())
32                   .count();
33 }
34 
35 #ifndef ROCKSDB_LITE
NotifyTableFileCreationStarted(const std::vector<std::shared_ptr<EventListener>> & listeners,const std::string & db_name,const std::string & cf_name,const std::string & file_path,int job_id,TableFileCreationReason reason)36 void EventHelpers::NotifyTableFileCreationStarted(
37     const std::vector<std::shared_ptr<EventListener>>& listeners,
38     const std::string& db_name, const std::string& cf_name,
39     const std::string& file_path, int job_id, TableFileCreationReason reason) {
40   if (listeners.empty()) {
41     return;
42   }
43   TableFileCreationBriefInfo info;
44   info.db_name = db_name;
45   info.cf_name = cf_name;
46   info.file_path = file_path;
47   info.job_id = job_id;
48   info.reason = reason;
49   for (auto& listener : listeners) {
50     listener->OnTableFileCreationStarted(info);
51   }
52 }
53 #endif  // !ROCKSDB_LITE
54 
NotifyOnBackgroundError(const std::vector<std::shared_ptr<EventListener>> & listeners,BackgroundErrorReason reason,Status * bg_error,InstrumentedMutex * db_mutex,bool * auto_recovery)55 void EventHelpers::NotifyOnBackgroundError(
56     const std::vector<std::shared_ptr<EventListener>>& listeners,
57     BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
58     bool* auto_recovery) {
59 #ifndef ROCKSDB_LITE
60   if (listeners.empty()) {
61     return;
62   }
63   db_mutex->AssertHeld();
64   // release lock while notifying events
65   db_mutex->Unlock();
66   for (auto& listener : listeners) {
67     listener->OnBackgroundError(reason, bg_error);
68     bg_error->PermitUncheckedError();
69     if (*auto_recovery) {
70       listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
71     }
72   }
73   db_mutex->Lock();
74 #else
75   (void)listeners;
76   (void)reason;
77   (void)bg_error;
78   (void)db_mutex;
79   (void)auto_recovery;
80 #endif  // ROCKSDB_LITE
81 }
82 
LogAndNotifyTableFileCreationFinished(EventLogger * event_logger,const std::vector<std::shared_ptr<EventListener>> & listeners,const std::string & db_name,const std::string & cf_name,const std::string & file_path,int job_id,const FileDescriptor & fd,uint64_t oldest_blob_file_number,const TableProperties & table_properties,TableFileCreationReason reason,const Status & s,const std::string & file_checksum,const std::string & file_checksum_func_name)83 void EventHelpers::LogAndNotifyTableFileCreationFinished(
84     EventLogger* event_logger,
85     const std::vector<std::shared_ptr<EventListener>>& listeners,
86     const std::string& db_name, const std::string& cf_name,
87     const std::string& file_path, int job_id, const FileDescriptor& fd,
88     uint64_t oldest_blob_file_number, const TableProperties& table_properties,
89     TableFileCreationReason reason, const Status& s,
90     const std::string& file_checksum,
91     const std::string& file_checksum_func_name) {
92   if (s.ok() && event_logger) {
93     JSONWriter jwriter;
94     AppendCurrentTime(&jwriter);
95     jwriter << "cf_name" << cf_name << "job" << job_id << "event"
96             << "table_file_creation"
97             << "file_number" << fd.GetNumber() << "file_size"
98             << fd.GetFileSize() << "file_checksum" << file_checksum
99             << "file_checksum_func_name" << file_checksum_func_name;
100 
101     // table_properties
102     {
103       jwriter << "table_properties";
104       jwriter.StartObject();
105 
106       // basic properties:
107       jwriter << "data_size" << table_properties.data_size << "index_size"
108               << table_properties.index_size << "index_partitions"
109               << table_properties.index_partitions << "top_level_index_size"
110               << table_properties.top_level_index_size
111               << "index_key_is_user_key"
112               << table_properties.index_key_is_user_key
113               << "index_value_is_delta_encoded"
114               << table_properties.index_value_is_delta_encoded << "filter_size"
115               << table_properties.filter_size << "raw_key_size"
116               << table_properties.raw_key_size << "raw_average_key_size"
117               << SafeDivide(table_properties.raw_key_size,
118                             table_properties.num_entries)
119               << "raw_value_size" << table_properties.raw_value_size
120               << "raw_average_value_size"
121               << SafeDivide(table_properties.raw_value_size,
122                             table_properties.num_entries)
123               << "num_data_blocks" << table_properties.num_data_blocks
124               << "num_entries" << table_properties.num_entries
125               << "num_filter_entries" << table_properties.num_filter_entries
126               << "num_deletions" << table_properties.num_deletions
127               << "num_merge_operands" << table_properties.num_merge_operands
128               << "num_range_deletions" << table_properties.num_range_deletions
129               << "format_version" << table_properties.format_version
130               << "fixed_key_len" << table_properties.fixed_key_len
131               << "filter_policy" << table_properties.filter_policy_name
132               << "column_family_name" << table_properties.column_family_name
133               << "column_family_id" << table_properties.column_family_id
134               << "comparator" << table_properties.comparator_name
135               << "merge_operator" << table_properties.merge_operator_name
136               << "prefix_extractor_name"
137               << table_properties.prefix_extractor_name << "property_collectors"
138               << table_properties.property_collectors_names << "compression"
139               << table_properties.compression_name << "compression_options"
140               << table_properties.compression_options << "creation_time"
141               << table_properties.creation_time << "oldest_key_time"
142               << table_properties.oldest_key_time << "file_creation_time"
143               << table_properties.file_creation_time
144               << "slow_compression_estimated_data_size"
145               << table_properties.slow_compression_estimated_data_size
146               << "fast_compression_estimated_data_size"
147               << table_properties.fast_compression_estimated_data_size
148               << "db_id" << table_properties.db_id << "db_session_id"
149               << table_properties.db_session_id << "orig_file_number"
150               << table_properties.orig_file_number;
151 
152       // user collected properties
153       for (const auto& prop : table_properties.readable_properties) {
154         jwriter << prop.first << prop.second;
155       }
156       jwriter.EndObject();
157     }
158 
159     if (oldest_blob_file_number != kInvalidBlobFileNumber) {
160       jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
161     }
162 
163     jwriter.EndObject();
164 
165     event_logger->Log(jwriter);
166   }
167 
168 #ifndef ROCKSDB_LITE
169   if (listeners.empty()) {
170     return;
171   }
172   TableFileCreationInfo info;
173   info.db_name = db_name;
174   info.cf_name = cf_name;
175   info.file_path = file_path;
176   info.file_size = fd.file_size;
177   info.job_id = job_id;
178   info.table_properties = table_properties;
179   info.reason = reason;
180   info.status = s;
181   info.file_checksum = file_checksum;
182   info.file_checksum_func_name = file_checksum_func_name;
183   for (auto& listener : listeners) {
184     listener->OnTableFileCreated(info);
185   }
186   info.status.PermitUncheckedError();
187 #else
188   (void)listeners;
189   (void)db_name;
190   (void)cf_name;
191   (void)file_path;
192   (void)reason;
193 #endif  // !ROCKSDB_LITE
194 }
195 
LogAndNotifyTableFileDeletion(EventLogger * event_logger,int job_id,uint64_t file_number,const std::string & file_path,const Status & status,const std::string & dbname,const std::vector<std::shared_ptr<EventListener>> & listeners)196 void EventHelpers::LogAndNotifyTableFileDeletion(
197     EventLogger* event_logger, int job_id, uint64_t file_number,
198     const std::string& file_path, const Status& status,
199     const std::string& dbname,
200     const std::vector<std::shared_ptr<EventListener>>& listeners) {
201   JSONWriter jwriter;
202   AppendCurrentTime(&jwriter);
203 
204   jwriter << "job" << job_id << "event"
205           << "table_file_deletion"
206           << "file_number" << file_number;
207   if (!status.ok()) {
208     jwriter << "status" << status.ToString();
209   }
210 
211   jwriter.EndObject();
212 
213   event_logger->Log(jwriter);
214 
215 #ifndef ROCKSDB_LITE
216   if (listeners.empty()) {
217     return;
218   }
219   TableFileDeletionInfo info;
220   info.db_name = dbname;
221   info.job_id = job_id;
222   info.file_path = file_path;
223   info.status = status;
224   for (auto& listener : listeners) {
225     listener->OnTableFileDeleted(info);
226   }
227   info.status.PermitUncheckedError();
228 #else
229   (void)file_path;
230   (void)dbname;
231   (void)listeners;
232 #endif  // !ROCKSDB_LITE
233 }
234 
NotifyOnErrorRecoveryCompleted(const std::vector<std::shared_ptr<EventListener>> & listeners,Status old_bg_error,InstrumentedMutex * db_mutex)235 void EventHelpers::NotifyOnErrorRecoveryCompleted(
236     const std::vector<std::shared_ptr<EventListener>>& listeners,
237     Status old_bg_error, InstrumentedMutex* db_mutex) {
238 #ifndef ROCKSDB_LITE
239   if (!listeners.empty()) {
240     db_mutex->AssertHeld();
241     // release lock while notifying events
242     db_mutex->Unlock();
243     for (auto& listener : listeners) {
244       listener->OnErrorRecoveryCompleted(old_bg_error);
245     }
246     db_mutex->Lock();
247   }
248   old_bg_error.PermitUncheckedError();
249 #else
250   (void)listeners;
251   (void)old_bg_error;
252   (void)db_mutex;
253 #endif  // ROCKSDB_LITE
254 }
255 
256 #ifndef ROCKSDB_LITE
NotifyBlobFileCreationStarted(const std::vector<std::shared_ptr<EventListener>> & listeners,const std::string & db_name,const std::string & cf_name,const std::string & file_path,int job_id,BlobFileCreationReason creation_reason)257 void EventHelpers::NotifyBlobFileCreationStarted(
258     const std::vector<std::shared_ptr<EventListener>>& listeners,
259     const std::string& db_name, const std::string& cf_name,
260     const std::string& file_path, int job_id,
261     BlobFileCreationReason creation_reason) {
262   if (listeners.empty()) {
263     return;
264   }
265   BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
266                                  creation_reason);
267   for (const auto& listener : listeners) {
268     listener->OnBlobFileCreationStarted(info);
269   }
270 }
271 #endif  // !ROCKSDB_LITE
272 
LogAndNotifyBlobFileCreationFinished(EventLogger * event_logger,const std::vector<std::shared_ptr<EventListener>> & listeners,const std::string & db_name,const std::string & cf_name,const std::string & file_path,int job_id,uint64_t file_number,BlobFileCreationReason creation_reason,const Status & s,const std::string & file_checksum,const std::string & file_checksum_func_name,uint64_t total_blob_count,uint64_t total_blob_bytes)273 void EventHelpers::LogAndNotifyBlobFileCreationFinished(
274     EventLogger* event_logger,
275     const std::vector<std::shared_ptr<EventListener>>& listeners,
276     const std::string& db_name, const std::string& cf_name,
277     const std::string& file_path, int job_id, uint64_t file_number,
278     BlobFileCreationReason creation_reason, const Status& s,
279     const std::string& file_checksum,
280     const std::string& file_checksum_func_name, uint64_t total_blob_count,
281     uint64_t total_blob_bytes) {
282   if (s.ok() && event_logger) {
283     JSONWriter jwriter;
284     AppendCurrentTime(&jwriter);
285     jwriter << "cf_name" << cf_name << "job" << job_id << "event"
286             << "blob_file_creation"
287             << "file_number" << file_number << "total_blob_count"
288             << total_blob_count << "total_blob_bytes" << total_blob_bytes
289             << "file_checksum" << file_checksum << "file_checksum_func_name"
290             << file_checksum_func_name << "status" << s.ToString();
291 
292     jwriter.EndObject();
293     event_logger->Log(jwriter);
294   }
295 
296 #ifndef ROCKSDB_LITE
297   if (listeners.empty()) {
298     return;
299   }
300   BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
301                             creation_reason, total_blob_count, total_blob_bytes,
302                             s, file_checksum, file_checksum_func_name);
303   for (const auto& listener : listeners) {
304     listener->OnBlobFileCreated(info);
305   }
306   info.status.PermitUncheckedError();
307 #else
308   (void)listeners;
309   (void)db_name;
310   (void)file_path;
311   (void)creation_reason;
312 #endif
313 }
314 
LogAndNotifyBlobFileDeletion(EventLogger * event_logger,const std::vector<std::shared_ptr<EventListener>> & listeners,int job_id,uint64_t file_number,const std::string & file_path,const Status & status,const std::string & dbname)315 void EventHelpers::LogAndNotifyBlobFileDeletion(
316     EventLogger* event_logger,
317     const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
318     uint64_t file_number, const std::string& file_path, const Status& status,
319     const std::string& dbname) {
320   if (event_logger) {
321     JSONWriter jwriter;
322     AppendCurrentTime(&jwriter);
323 
324     jwriter << "job" << job_id << "event"
325             << "blob_file_deletion"
326             << "file_number" << file_number;
327     if (!status.ok()) {
328       jwriter << "status" << status.ToString();
329     }
330 
331     jwriter.EndObject();
332     event_logger->Log(jwriter);
333   }
334 #ifndef ROCKSDB_LITE
335   if (listeners.empty()) {
336     return;
337   }
338   BlobFileDeletionInfo info(dbname, file_path, job_id, status);
339   for (const auto& listener : listeners) {
340     listener->OnBlobFileDeleted(info);
341   }
342   info.status.PermitUncheckedError();
343 #else
344   (void)listeners;
345   (void)dbname;
346   (void)file_path;
347 #endif  // !ROCKSDB_LITE
348 }
349 
350 }  // namespace ROCKSDB_NAMESPACE
351