1 /*
2 * Trace.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21
22 #include "flow/Trace.h"
23 #include "flow/FileTraceLogWriter.h"
24 #include "flow/XmlTraceLogFormatter.h"
25 #include "flow/JsonTraceLogFormatter.h"
26 #include "flow/flow.h"
27 #include "flow/DeterministicRandom.h"
28 #include <stdlib.h>
29 #include <stdarg.h>
30 #include <cctype>
31 #include <time.h>
32
33 #include "flow/IThreadPool.h"
34 #include "flow/ThreadHelper.actor.h"
35 #include "flow/FastRef.h"
36 #include "flow/EventTypes.actor.h"
37 #include "flow/TDMetric.actor.h"
38 #include "flow/MetricSample.h"
39
40 #ifdef _WIN32
41 #include <windows.h>
42 #undef max
43 #undef min
44 #endif
45
46 class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
47 public:
~DummyThreadPool()48 ~DummyThreadPool() {}
DummyThreadPool()49 DummyThreadPool() : thread(NULL) {}
getError()50 Future<Void> getError() {
51 return errors.getFuture();
52 }
addThread(IThreadPoolReceiver * userData)53 void addThread( IThreadPoolReceiver* userData ) {
54 ASSERT( !thread );
55 thread = userData;
56 }
post(PThreadAction action)57 void post( PThreadAction action ) {
58 try {
59 (*action)( thread );
60 } catch (Error& e) {
61 errors.sendError( e );
62 } catch (...) {
63 errors.sendError( unknown_error() );
64 }
65 }
stop()66 Future<Void> stop() {
67 return Void();
68 }
addref()69 void addref() {
70 ReferenceCounted<DummyThreadPool>::addref();
71 }
delref()72 void delref() {
73 ReferenceCounted<DummyThreadPool>::delref();
74 }
75
76 private:
77 IThreadPoolReceiver* thread;
78 Promise<Void> errors;
79 };
80
81 struct SuppressionMap {
82 struct SuppressionInfo {
83 double endTime;
84 int64_t suppressedEventCount;
85
SuppressionInfoSuppressionMap::SuppressionInfo86 SuppressionInfo() : endTime(0), suppressedEventCount(0) {}
87 };
88
89 std::map<std::string, SuppressionInfo> suppressionMap;
90
91 // Returns -1 if this event is suppressed
checkAndInsertSuppressionSuppressionMap92 int64_t checkAndInsertSuppression(std::string type, double duration) {
93 ASSERT(g_network);
94 if(suppressionMap.size() >= FLOW_KNOBS->MAX_TRACE_SUPPRESSIONS) {
95 TraceEvent(SevWarnAlways, "ClearingTraceSuppressionMap");
96 suppressionMap.clear();
97 }
98
99 auto insertion = suppressionMap.insert(std::make_pair(type, SuppressionInfo()));
100 if(insertion.second || insertion.first->second.endTime <= now()) {
101 int64_t suppressedEventCount = insertion.first->second.suppressedEventCount;
102 insertion.first->second.endTime = now() + duration;
103 insertion.first->second.suppressedEventCount = 0;
104 return suppressedEventCount;
105 }
106 else {
107 ++insertion.first->second.suppressedEventCount;
108 return -1;
109 }
110 }
111 };
112
113 TraceBatch g_traceBatch;
114 trace_clock_t g_trace_clock = TRACE_CLOCK_NOW;
115 IRandom* trace_random = NULL;
116
117 LatestEventCache latestEventCache;
118 SuppressionMap suppressedEvents;
119
120 static TransientThresholdMetricSample<Standalone<StringRef>> *traceEventThrottlerCache;
121 static const char *TRACE_EVENT_THROTTLE_STARTING_TYPE = "TraceEventThrottle_";
122 static const char *TRACE_EVENT_INVALID_SUPPRESSION = "InvalidSuppression_";
123 static int TRACE_LOG_MAX_PREOPEN_BUFFER = 1000000;
124 static int TRACE_EVENT_MAX_SIZE = 4000;
125
126 struct TraceLog {
127 Reference<ITraceLogFormatter> formatter;
128
129 private:
130 Reference<ITraceLogWriter> logWriter;
131 std::vector<TraceEventFields> eventBuffer;
132 int loggedLength;
133 int bufferLength;
134 bool opened;
135 int64_t preopenOverflowCount;
136 std::string basename;
137 std::string logGroup;
138
139 std::string directory;
140 std::string processName;
141 Optional<NetworkAddress> localAddress;
142
143 Reference<IThreadPool> writer;
144 uint64_t rollsize;
145 Mutex mutex;
146
147 EventMetricHandle<TraceEventNameID> SevErrorNames;
148 EventMetricHandle<TraceEventNameID> SevWarnAlwaysNames;
149 EventMetricHandle<TraceEventNameID> SevWarnNames;
150 EventMetricHandle<TraceEventNameID> SevInfoNames;
151 EventMetricHandle<TraceEventNameID> SevDebugNames;
152
153 struct RoleInfo {
154 std::map<std::string, int> roles;
155 std::string rolesString;
156
refreshRolesStringTraceLog::RoleInfo157 void refreshRolesString() {
158 rolesString = "";
159 for(auto itr : roles) {
160 if(!rolesString.empty()) {
161 rolesString += ",";
162 }
163 rolesString += itr.first;
164 }
165 }
166 };
167
168 RoleInfo roleInfo;
169 std::map<NetworkAddress, RoleInfo> roleInfoMap;
170
mutateRoleInfoTraceLog171 RoleInfo& mutateRoleInfo() {
172 ASSERT(g_network);
173
174 if(g_network->isSimulated()) {
175 return roleInfoMap[g_network->getLocalAddress()];
176 }
177
178 return roleInfo;
179 }
180
181 public:
182 bool logTraceEventMetrics;
183
initMetricsTraceLog184 void initMetrics()
185 {
186 SevErrorNames.init(LiteralStringRef("TraceEvents.SevError"));
187 SevWarnAlwaysNames.init(LiteralStringRef("TraceEvents.SevWarnAlways"));
188 SevWarnNames.init(LiteralStringRef("TraceEvents.SevWarn"));
189 SevInfoNames.init(LiteralStringRef("TraceEvents.SevInfo"));
190 SevDebugNames.init(LiteralStringRef("TraceEvents.SevDebug"));
191 logTraceEventMetrics = true;
192 }
193
194 struct BarrierList : ThreadSafeReferenceCounted<BarrierList> {
BarrierListTraceLog::BarrierList195 BarrierList() : ntriggered(0) {}
pushTraceLog::BarrierList196 void push( ThreadFuture<Void> f ) {
197 MutexHolder h(mutex);
198 barriers.push_back(f);
199 }
popTraceLog::BarrierList200 void pop() {
201 MutexHolder h(mutex);
202 unsafeTrigger(0);
203 barriers.pop_front();
204 if (ntriggered) ntriggered--;
205 }
triggerAllTraceLog::BarrierList206 void triggerAll() {
207 MutexHolder h(mutex);
208 for(uint32_t i=ntriggered; i<barriers.size(); i++)
209 unsafeTrigger(i);
210 ntriggered = barriers.size();
211 }
212 private:
213 Mutex mutex;
214 Deque< ThreadFuture<Void> > barriers;
215 int ntriggered;
unsafeTriggerTraceLog::BarrierList216 void unsafeTrigger(int i) {
217 auto b = ((ThreadSingleAssignmentVar<Void>*)barriers[i].getPtr());
218 if (!b->isReady())
219 b->send(Void());
220 }
221 };
222
223 Reference<BarrierList> barriers;
224
225 struct WriterThread : IThreadPoolReceiver {
WriterThreadTraceLog::WriterThread226 WriterThread( Reference<BarrierList> barriers, Reference<ITraceLogWriter> logWriter, Reference<ITraceLogFormatter> formatter )
227 : barriers(barriers), logWriter(logWriter), formatter(formatter) {}
228
initTraceLog::WriterThread229 virtual void init() {}
230
231 Reference<ITraceLogWriter> logWriter;
232 Reference<ITraceLogFormatter> formatter;
233 Reference<BarrierList> barriers;
234
235 struct Open : TypedAction<WriterThread,Open> {
getTimeEstimateTraceLog::WriterThread::Open236 virtual double getTimeEstimate() { return 0; }
237 };
actionTraceLog::WriterThread238 void action( Open& o ) {
239 logWriter->open();
240 logWriter->write(formatter->getHeader());
241 }
242
243 struct Close : TypedAction<WriterThread,Close> {
getTimeEstimateTraceLog::WriterThread::Close244 virtual double getTimeEstimate() { return 0; }
245 };
actionTraceLog::WriterThread246 void action( Close& c ) {
247 logWriter->write(formatter->getFooter());
248 logWriter->close();
249 }
250
251 struct Roll : TypedAction<WriterThread,Roll> {
getTimeEstimateTraceLog::WriterThread::Roll252 virtual double getTimeEstimate() { return 0; }
253 };
actionTraceLog::WriterThread254 void action( Roll& c ) {
255 logWriter->write(formatter->getFooter());
256 logWriter->roll();
257 logWriter->write(formatter->getHeader());
258 }
259
260 struct Barrier : TypedAction<WriterThread, Barrier> {
getTimeEstimateTraceLog::WriterThread::Barrier261 virtual double getTimeEstimate() { return 0; }
262 };
actionTraceLog::WriterThread263 void action( Barrier& a ) {
264 barriers->pop();
265 }
266
267 struct WriteBuffer : TypedAction<WriterThread, WriteBuffer> {
268 std::vector<TraceEventFields> events;
269
WriteBufferTraceLog::WriterThread::WriteBuffer270 WriteBuffer(std::vector<TraceEventFields> events) : events(events) {}
getTimeEstimateTraceLog::WriterThread::WriteBuffer271 virtual double getTimeEstimate() { return .001; }
272 };
actionTraceLog::WriterThread273 void action( WriteBuffer& a ) {
274 for(auto event : a.events) {
275 event.validateFormat();
276 logWriter->write(formatter->formatEvent(event));
277 }
278
279 if(FLOW_KNOBS->TRACE_SYNC_ENABLED) {
280 logWriter->sync();
281 }
282 }
283 };
284
TraceLogTraceLog285 TraceLog() : bufferLength(0), loggedLength(0), opened(false), preopenOverflowCount(0), barriers(new BarrierList), logTraceEventMetrics(false), formatter(new XmlTraceLogFormatter()) {}
286
isOpenTraceLog287 bool isOpen() const { return opened; }
288
openTraceLog289 void open( std::string const& directory, std::string const& processName, std::string logGroup, std::string const& timestamp, uint64_t rs, uint64_t maxLogsSize, Optional<NetworkAddress> na ) {
290 ASSERT( !writer && !opened );
291
292 this->directory = directory;
293 this->processName = processName;
294 this->logGroup = logGroup;
295 this->localAddress = na;
296
297 basename = format("%s/%s.%s.%s", directory.c_str(), processName.c_str(), timestamp.c_str(), g_random->randomAlphaNumeric(6).c_str());
298 logWriter = Reference<ITraceLogWriter>(new FileTraceLogWriter(directory, processName, basename, formatter->getExtension(), maxLogsSize, [this](){ barriers->triggerAll(); }));
299
300 if ( g_network->isSimulated() )
301 writer = Reference<IThreadPool>(new DummyThreadPool());
302 else
303 writer = createGenericThreadPool();
304 writer->addThread( new WriterThread(barriers, logWriter, formatter) );
305
306 rollsize = rs;
307
308 auto a = new WriterThread::Open;
309 writer->post(a);
310
311 MutexHolder holder(mutex);
312 if(g_network->isSimulated()) {
313 // We don't support early trace logs in simulation.
314 // This is because we don't know if we're being simulated prior to the network being created, which causes two ambiguities:
315 //
316 // 1. We need to employ two different methods to determine the time of an event prior to the network starting for real-world and simulated runs.
317 // 2. Simulated runs manually insert the Machine field at TraceEvent creation time. Real-world runs add this field at write time.
318 //
319 // Without the ability to resolve the ambiguity, we've chosen to always favor the real-world approach and not support such events in simulation.
320 eventBuffer.clear();
321 }
322
323 for(TraceEventFields &fields : eventBuffer) {
324 annotateEvent(fields);
325 }
326
327 opened = true;
328 if(preopenOverflowCount > 0) {
329 TraceEvent(SevWarn, "TraceLogPreopenOverflow").detail("OverflowEventCount", preopenOverflowCount);
330 preopenOverflowCount = 0;
331 }
332 }
333
annotateEventTraceLog334 void annotateEvent( TraceEventFields &fields ) {
335 if(localAddress.present()) {
336 fields.addField("Machine", formatIpPort(localAddress.get().ip, localAddress.get().port));
337 }
338
339 fields.addField("LogGroup", logGroup);
340
341 RoleInfo const& r = mutateRoleInfo();
342 if(r.rolesString.size() > 0) {
343 fields.addField("Roles", r.rolesString);
344 }
345 }
346
writeEventTraceLog347 void writeEvent( TraceEventFields fields, std::string trackLatestKey, bool trackError ) {
348 MutexHolder hold(mutex);
349
350 if(opened) {
351 annotateEvent(fields);
352 }
353
354 if(!trackLatestKey.empty()) {
355 fields.addField("TrackLatestType", "Original");
356 }
357
358 if(!isOpen() && (preopenOverflowCount > 0 || bufferLength + fields.sizeBytes() > TRACE_LOG_MAX_PREOPEN_BUFFER)) {
359 ++preopenOverflowCount;
360 return;
361 }
362
363 // FIXME: What if we are using way too much memory for buffer?
364 eventBuffer.push_back(fields);
365 bufferLength += fields.sizeBytes();
366
367 if(trackError) {
368 latestEventCache.setLatestError(fields);
369 }
370 if(!trackLatestKey.empty()) {
371 latestEventCache.set(trackLatestKey, fields);
372 }
373 }
374
logTraceLog375 void log(int severity, const char *name, UID id, uint64_t event_ts)
376 {
377 if(!logTraceEventMetrics)
378 return;
379
380 EventMetricHandle<TraceEventNameID> *m = NULL;
381 switch(severity)
382 {
383 case SevError: m = &SevErrorNames; break;
384 case SevWarnAlways: m = &SevWarnAlwaysNames; break;
385 case SevWarn: m = &SevWarnNames; break;
386 case SevInfo: m = &SevInfoNames; break;
387 case SevDebug: m = &SevDebugNames; break;
388 default:
389 break;
390 }
391 if(m != NULL)
392 {
393 (*m)->name = StringRef((uint8_t*)name, strlen(name));
394 (*m)->id = id.toString();
395 (*m)->log(event_ts);
396 }
397 }
398
flushTraceLog399 ThreadFuture<Void> flush() {
400 traceEventThrottlerCache->poll();
401
402 MutexHolder hold(mutex);
403 bool roll = false;
404 if (!eventBuffer.size()) return Void(); // SOMEDAY: maybe we still roll the tracefile here?
405
406 if (rollsize && bufferLength + loggedLength > rollsize) // SOMEDAY: more conditions to roll
407 roll = true;
408
409 auto a = new WriterThread::WriteBuffer( std::move(eventBuffer) );
410 loggedLength += bufferLength;
411 eventBuffer = std::vector<TraceEventFields>();
412 bufferLength = 0;
413 writer->post( a );
414
415 if (roll) {
416 auto o = new WriterThread::Roll;
417 writer->post(o);
418
419 std::vector<TraceEventFields> events = latestEventCache.getAllUnsafe();
420 for (int idx = 0; idx < events.size(); idx++) {
421 if(events[idx].size() > 0) {
422 TraceEventFields rolledFields;
423 for(auto itr = events[idx].begin(); itr != events[idx].end(); ++itr) {
424 if(itr->first == "Time") {
425 rolledFields.addField("Time", format("%.6f", (g_trace_clock == TRACE_CLOCK_NOW) ? now() : timer()));
426 rolledFields.addField("OriginalTime", itr->second);
427 }
428 else if(itr->first == "TrackLatestType") {
429 rolledFields.addField("TrackLatestType", "Rolled");
430 }
431 else {
432 rolledFields.addField(itr->first, itr->second);
433 }
434 }
435
436 eventBuffer.push_back(rolledFields);
437 }
438 }
439
440 loggedLength = 0;
441 }
442
443 ThreadFuture<Void> f(new ThreadSingleAssignmentVar<Void>);
444 barriers->push(f);
445 writer->post( new WriterThread::Barrier );
446
447 return f;
448 }
449
closeTraceLog450 void close() {
451 if (opened) {
452 MutexHolder hold(mutex);
453
454 // Write remaining contents
455 auto a = new WriterThread::WriteBuffer( std::move(eventBuffer) );
456 loggedLength += bufferLength;
457 eventBuffer = std::vector<TraceEventFields>();
458 bufferLength = 0;
459 writer->post( a );
460
461 auto c = new WriterThread::Close();
462 writer->post( c );
463
464 ThreadFuture<Void> f(new ThreadSingleAssignmentVar<Void>);
465 barriers->push(f);
466 writer->post( new WriterThread::Barrier );
467
468 f.getBlocking();
469
470 opened = false;
471 }
472 }
473
addRoleTraceLog474 void addRole(std::string role) {
475 MutexHolder holder(mutex);
476
477 RoleInfo &r = mutateRoleInfo();
478 ++r.roles[role];
479 r.refreshRolesString();
480 }
481
removeRoleTraceLog482 void removeRole(std::string role) {
483 MutexHolder holder(mutex);
484
485 RoleInfo &r = mutateRoleInfo();
486
487 auto itr = r.roles.find(role);
488 ASSERT(itr != r.roles.end() || (g_network->isSimulated() && g_network->getLocalAddress() == NetworkAddress()));
489
490 if(itr != r.roles.end() && --(*itr).second == 0) {
491 r.roles.erase(itr);
492 r.refreshRolesString();
493 }
494 }
495
~TraceLogTraceLog496 ~TraceLog() {
497 close();
498 if (writer) writer->addref(); // FIXME: We are not shutting down the writer thread at all, because the ThreadPool shutdown mechanism is blocking (necessarily waits for current work items to finish) and we might not be able to finish everything.
499 }
500 };
501
getAddressIndex()502 NetworkAddress getAddressIndex() {
503 // ahm
504 // if( g_network->isSimulated() )
505 // return g_simulator.getCurrentProcess()->address;
506 // else
507 return g_network->getLocalAddress();
508 }
509
510 // This does not check for simulation, and as such is not safe for external callers
clearPrefix_internal(std::map<std::string,TraceEventFields> & data,std::string prefix)511 void clearPrefix_internal( std::map<std::string, TraceEventFields>& data, std::string prefix ) {
512 auto first = data.lower_bound( prefix );
513 auto last = data.lower_bound( strinc( prefix ).toString() );
514 data.erase( first, last );
515 }
516
clear(std::string prefix)517 void LatestEventCache::clear( std::string prefix ) {
518 clearPrefix_internal( latest[getAddressIndex()], prefix );
519 }
520
clear()521 void LatestEventCache::clear() {
522 latest[getAddressIndex()].clear();
523 }
524
set(std::string tag,const TraceEventFields & contents)525 void LatestEventCache::set( std::string tag, const TraceEventFields& contents ) {
526 latest[getAddressIndex()][tag] = contents;
527 }
528
get(std::string tag)529 TraceEventFields LatestEventCache::get( std::string tag ) {
530 return latest[getAddressIndex()][tag];
531 }
532
allEvents(std::map<std::string,TraceEventFields> const & data)533 std::vector<TraceEventFields> allEvents( std::map<std::string, TraceEventFields> const& data ) {
534 std::vector<TraceEventFields> all;
535 for(auto it = data.begin(); it != data.end(); it++) {
536 all.push_back( it->second );
537 }
538 return all;
539 }
540
getAll()541 std::vector<TraceEventFields> LatestEventCache::getAll() {
542 return allEvents( latest[getAddressIndex()] );
543 }
544
545 // if in simulation, all events from all machines will be returned
getAllUnsafe()546 std::vector<TraceEventFields> LatestEventCache::getAllUnsafe() {
547 std::vector<TraceEventFields> all;
548 for(auto it = latest.begin(); it != latest.end(); ++it) {
549 auto m = allEvents( it->second );
550 all.insert( all.end(), m.begin(), m.end() );
551 }
552 return all;
553 }
554
setLatestError(const TraceEventFields & contents)555 void LatestEventCache::setLatestError( const TraceEventFields& contents ) {
556 if(TraceEvent::isNetworkThread()) { // The latest event cache doesn't track errors that happen on other threads
557 latestErrors[getAddressIndex()] = contents;
558 }
559 }
560
getLatestError()561 TraceEventFields LatestEventCache::getLatestError() {
562 return latestErrors[getAddressIndex()];
563 }
564
565 static TraceLog g_traceLog;
566
567 namespace {
568 template <bool validate>
traceFormatImpl(std::string & format)569 bool traceFormatImpl(std::string& format) {
570 std::transform(format.begin(), format.end(), format.begin(), ::tolower);
571 if (format == "xml") {
572 if (!validate) {
573 g_traceLog.formatter = Reference<ITraceLogFormatter>(new XmlTraceLogFormatter());
574 }
575 return true;
576 } else if (format == "json") {
577 if (!validate) {
578 g_traceLog.formatter = Reference<ITraceLogFormatter>(new JsonTraceLogFormatter());
579 }
580 return true;
581 } else {
582 if (!validate) {
583 g_traceLog.formatter = Reference<ITraceLogFormatter>(new XmlTraceLogFormatter());
584 }
585 return false;
586 }
587 }
588 } // namespace
589
selectTraceFormatter(std::string format)590 bool selectTraceFormatter(std::string format) {
591 ASSERT(!g_traceLog.isOpen());
592 bool recognized = traceFormatImpl</*validate*/ false>(format);
593 if (!recognized) {
594 TraceEvent(SevWarnAlways, "UnrecognizedTraceFormat").detail("format", format);
595 }
596 return recognized;
597 }
598
validateTraceFormat(std::string format)599 bool validateTraceFormat(std::string format) {
600 return traceFormatImpl</*validate*/ true>(format);
601 }
602
flushTraceFile()603 ThreadFuture<Void> flushTraceFile() {
604 if (!g_traceLog.isOpen())
605 return Void();
606 return g_traceLog.flush();
607 }
608
flushTraceFileVoid()609 void flushTraceFileVoid() {
610 if ( g_network && g_network->isSimulated() )
611 flushTraceFile();
612 else {
613 flushTraceFile().getBlocking();
614 }
615 }
616
openTraceFile(const NetworkAddress & na,uint64_t rollsize,uint64_t maxLogsSize,std::string directory,std::string baseOfBase,std::string logGroup)617 void openTraceFile(const NetworkAddress& na, uint64_t rollsize, uint64_t maxLogsSize, std::string directory, std::string baseOfBase, std::string logGroup) {
618 if(g_traceLog.isOpen())
619 return;
620
621 if(directory.empty())
622 directory = ".";
623
624 if (baseOfBase.empty())
625 baseOfBase = "trace";
626
627 std::string ip = na.ip.toString();
628 std::replace(ip.begin(), ip.end(), ':', '_'); // For IPv6, Windows doesn't accept ':' in filenames.
629 std::string baseName = format("%s.%s.%d", baseOfBase.c_str(), ip.c_str(), na.port);
630 g_traceLog.open( directory, baseName, logGroup, format("%lld", time(NULL)), rollsize, maxLogsSize, !g_network->isSimulated() ? na : Optional<NetworkAddress>());
631
632 uncancellable(recurring(&flushTraceFile, FLOW_KNOBS->TRACE_FLUSH_INTERVAL, TaskFlushTrace));
633 g_traceBatch.dump();
634 }
635
initTraceEventMetrics()636 void initTraceEventMetrics() {
637 g_traceLog.initMetrics();
638 }
639
closeTraceFile()640 void closeTraceFile() {
641 g_traceLog.close();
642 }
643
traceFileIsOpen()644 bool traceFileIsOpen() {
645 return g_traceLog.isOpen();
646 }
647
addTraceRole(std::string role)648 void addTraceRole(std::string role) {
649 g_traceLog.addRole(role);
650 }
651
removeTraceRole(std::string role)652 void removeTraceRole(std::string role) {
653 g_traceLog.removeRole(role);
654 }
655
TraceEvent(const char * type,UID id)656 TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true) {}
TraceEvent(Severity severity,const char * type,UID id)657 TraceEvent::TraceEvent( Severity severity, const char* type, UID id )
658 : id(id), type(type), severity(severity), initialized(false),
659 enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {}
TraceEvent(TraceInterval & interval,UID id)660 TraceEvent::TraceEvent( TraceInterval& interval, UID id )
661 : id(id), type(interval.type)
662 , severity(interval.severity)
663 , initialized(false)
664 , enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= interval.severity) {
665 init(interval);
666 }
TraceEvent(Severity severity,TraceInterval & interval,UID id)667 TraceEvent::TraceEvent( Severity severity, TraceInterval& interval, UID id )
668 : id(id), type(interval.type),
669 severity(severity),
670 initialized(false),
671 enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
672 init(interval);
673 }
674
init(TraceInterval & interval)675 bool TraceEvent::init( TraceInterval& interval ) {
676 bool result = init();
677 switch (interval.count++) {
678 case 0: { detail("BeginPair", interval.pairID); break; }
679 case 1: { detail("EndPair", interval.pairID); break; }
680 default: ASSERT(false);
681 }
682 return result;
683 }
684
init()685 bool TraceEvent::init() {
686 if(initialized) {
687 return enabled;
688 }
689 initialized = true;
690
691 ASSERT(*type != '\0');
692 enabled = enabled && ( !g_network || severity >= FLOW_KNOBS->MIN_TRACE_SEVERITY );
693
694 // Backstop to throttle very spammy trace events
695 if (enabled && g_network && !g_network->isSimulated() && severity > SevDebug && isNetworkThread()) {
696 if (traceEventThrottlerCache->isAboveThreshold(StringRef((uint8_t*)type, strlen(type)))) {
697 enabled = false;
698 TraceEvent(SevWarnAlways, std::string(TRACE_EVENT_THROTTLE_STARTING_TYPE).append(type).c_str()).suppressFor(5);
699 }
700 else {
701 traceEventThrottlerCache->addAndExpire(StringRef((uint8_t*)type, strlen(type)), 1, now() + FLOW_KNOBS->TRACE_EVENT_THROTTLER_SAMPLE_EXPIRY);
702 }
703 }
704
705 if(enabled) {
706 tmpEventMetric = new DynamicEventMetric(MetricNameRef());
707
708 double time;
709 if(g_trace_clock == TRACE_CLOCK_NOW) {
710 if(!g_network) {
711 static double preNetworkTime = timer_monotonic();
712 time = preNetworkTime;
713 }
714 else {
715 time = now();
716 }
717 }
718 else {
719 time = timer();
720 }
721
722 if(err.isValid() && err.isInjectedFault() && severity == SevError) {
723 severity = SevWarnAlways;
724 }
725
726 detail("Severity", int(severity));
727 detailf("Time", "%.6f", time);
728 detail("Type", type);
729 if(g_network && g_network->isSimulated()) {
730 NetworkAddress local = g_network->getLocalAddress();
731 detail("Machine", formatIpPort(local.ip, local.port));
732 }
733 detail("ID", id);
734 if(err.isValid()) {
735 if (err.isInjectedFault()) {
736 detail("ErrorIsInjectedFault", true);
737 }
738 detail("Error", err.name());
739 detail("ErrorDescription", err.what());
740 detail("ErrorCode", err.code());
741 }
742 } else {
743 tmpEventMetric = nullptr;
744 }
745
746 return enabled;
747 }
748
errorImpl(class Error const & error,bool includeCancelled)749 TraceEvent& TraceEvent::errorImpl(class Error const& error, bool includeCancelled) {
750 if (error.code() != error_code_actor_cancelled || includeCancelled) {
751 err = error;
752 if (initialized) {
753 if (error.isInjectedFault()) {
754 detail("ErrorIsInjectedFault", true);
755 if(severity == SevError) severity = SevWarnAlways;
756 }
757 detail("Error", error.name());
758 detail("ErrorDescription", error.what());
759 detail("ErrorCode", error.code());
760 }
761 } else {
762 if (initialized) {
763 TraceEvent(g_network && g_network->isSimulated() ? SevError : SevWarnAlways, std::string(TRACE_EVENT_INVALID_SUPPRESSION).append(type).c_str()).suppressFor(5);
764 } else {
765 enabled = false;
766 }
767 }
768 return *this;
769 }
770
detailImpl(std::string && key,std::string && value,bool writeEventMetricField)771 TraceEvent& TraceEvent::detailImpl( std::string&& key, std::string&& value, bool writeEventMetricField) {
772 init();
773 if (enabled) {
774 if( value.size() > 495 ) {
775 value = value.substr(0, 495) + "...";
776 }
777
778 if(writeEventMetricField) {
779 tmpEventMetric->setField(key.c_str(), Standalone<StringRef>(StringRef(value)));
780 }
781
782 fields.addField(std::move(key), std::move(value));
783
784 if(fields.sizeBytes() > TRACE_EVENT_MAX_SIZE) {
785 TraceEvent(g_network && g_network->isSimulated() ? SevError : SevWarnAlways, "TraceEventOverflow").detail("TraceFirstBytes", fields.toString().substr(300));
786 enabled = false;
787 }
788 }
789 return *this;
790 }
791
setField(const char * key,int64_t value)792 void TraceEvent::setField(const char* key, int64_t value) {
793 tmpEventMetric->setField(key, value);
794 }
795
setField(const char * key,double value)796 void TraceEvent::setField(const char* key, double value) {
797 tmpEventMetric->setField(key, value);
798 }
799
setField(const char * key,const std::string & value)800 void TraceEvent::setField(const char* key, const std::string& value) {
801 tmpEventMetric->setField(key, Standalone<StringRef>(value));
802 }
803
detailf(std::string key,const char * valueFormat,...)804 TraceEvent& TraceEvent::detailf( std::string key, const char* valueFormat, ... ) {
805 if (enabled) {
806 va_list args;
807 va_start(args, valueFormat);
808 std::string value;
809 int result = vsformat(value, valueFormat, args);
810 va_end(args);
811
812 ASSERT(result >= 0);
813 detailImpl(std::move(key), std::move(value));
814 }
815 return *this;
816 }
detailfNoMetric(std::string && key,const char * valueFormat,...)817 TraceEvent& TraceEvent::detailfNoMetric( std::string&& key, const char* valueFormat, ... ) {
818 if (enabled) {
819 va_list args;
820 va_start(args, valueFormat);
821 std::string value;
822 int result = vsformat(value, valueFormat, args);
823 va_end(args);
824
825 ASSERT(result >= 0);
826 detailImpl(std::move(key), std::move(value), false); // Do NOT write this detail to the event metric, caller of detailfNoMetric should do that itself with the appropriate value type
827 }
828 return *this;
829 }
830
trackLatest(const char * trackingKey)831 TraceEvent& TraceEvent::trackLatest( const char *trackingKey ){
832 this->trackingKey = trackingKey;
833 ASSERT( this->trackingKey.size() != 0 && this->trackingKey[0] != '/' && this->trackingKey[0] != '\\');
834 return *this;
835 }
836
sample(double sampleRate,bool logSampleRate)837 TraceEvent& TraceEvent::sample( double sampleRate, bool logSampleRate ) {
838 if(enabled) {
839 if(initialized) {
840 TraceEvent(g_network && g_network->isSimulated() ? SevError : SevWarnAlways, std::string(TRACE_EVENT_INVALID_SUPPRESSION).append(type).c_str()).suppressFor(5);
841 return *this;
842 }
843
844 if(!g_random) {
845 sampleRate = 1.0;
846 }
847 else {
848 enabled = enabled && g_random->random01() < sampleRate;
849 }
850
851 if(enabled && logSampleRate) {
852 detail("SampleRate", sampleRate);
853 }
854 }
855
856 return *this;
857 }
858
suppressFor(double duration,bool logSuppressedEventCount)859 TraceEvent& TraceEvent::suppressFor( double duration, bool logSuppressedEventCount ) {
860 if(enabled) {
861 if(initialized) {
862 TraceEvent(g_network && g_network->isSimulated() ? SevError : SevWarnAlways, std::string(TRACE_EVENT_INVALID_SUPPRESSION).append(type).c_str()).suppressFor(5);
863 return *this;
864 }
865
866 if(g_network) {
867 if(isNetworkThread()) {
868 int64_t suppressedEventCount = suppressedEvents.checkAndInsertSuppression(type, duration);
869 enabled = enabled && suppressedEventCount >= 0;
870 if(enabled && logSuppressedEventCount) {
871 detail("SuppressedEventCount", suppressedEventCount);
872 }
873 }
874 else {
875 TraceEvent(SevWarnAlways, "SuppressionFromNonNetworkThread").detail("Type", type);
876 detail("__InvalidSuppression__", ""); // Choosing a detail name that is unlikely to collide with other names
877 }
878 }
879 init(); //we do not want any future calls on this trace event to disable it, because we have already counted it towards our suppression budget
880 }
881
882 return *this;
883 }
884
GetLastError()885 TraceEvent& TraceEvent::GetLastError() {
886 #ifdef _WIN32
887 return detailf("WinErrorCode", "%x", ::GetLastError());
888 #elif defined(__unixish__)
889 return detailf("UnixErrorCode", "%x", errno).detail("UnixError", strerror(errno));
890 #endif
891 }
892
893 // We're cheating in counting, as in practice, we only use {10,20,30,40}.
894 static_assert(SevMaxUsed / 10 + 1 == 5, "Please bump eventCounts[5] to SevMaxUsed/10+1");
895 unsigned long TraceEvent::eventCounts[5] = {0,0,0,0,0};
896
CountEventsLoggedAt(Severity sev)897 unsigned long TraceEvent::CountEventsLoggedAt(Severity sev) {
898 return TraceEvent::eventCounts[sev/10];
899 }
900
backtrace(const std::string & prefix)901 TraceEvent& TraceEvent::backtrace(const std::string& prefix) {
902 if (this->severity == SevError || !enabled) return *this; // We'll backtrace this later in ~TraceEvent
903 return detail(prefix + "Backtrace", platform::get_backtrace());
904 }
905
~TraceEvent()906 TraceEvent::~TraceEvent() {
907 init();
908 try {
909 if (enabled) {
910 if (this->severity == SevError) {
911 severity = SevInfo;
912 backtrace();
913 severity = SevError;
914 }
915
916 TraceEvent::eventCounts[severity/10]++;
917 g_traceLog.writeEvent( fields, trackingKey, severity > SevWarnAlways );
918
919 if (g_traceLog.isOpen()) {
920 // Log Metrics
921 if(g_traceLog.logTraceEventMetrics && isNetworkThread()) {
922 // Get the persistent Event Metric representing this trace event and push the fields (details) accumulated in *this to it and then log() it.
923 // Note that if the event metric is disabled it won't actually be logged BUT any new fields added to it will be registered.
924 // If the event IS logged, a timestamp will be returned, if not then 0. Either way, pass it through to be used if possible
925 // in the Sev* event metrics.
926
927 uint64_t event_ts = DynamicEventMetric::getOrCreateInstance(format("TraceEvent.%s", type), StringRef(), true)->setFieldsAndLogFrom(tmpEventMetric);
928 g_traceLog.log(severity, type, id, event_ts);
929 }
930 }
931 }
932 } catch( Error &e ) {
933 TraceEvent(SevError, "TraceEventDestructorError").error(e,true);
934 }
935 delete tmpEventMetric;
936 }
937
938 thread_local bool TraceEvent::networkThread = false;
939
setNetworkThread()940 void TraceEvent::setNetworkThread() {
941 traceEventThrottlerCache = new TransientThresholdMetricSample<Standalone<StringRef>>(FLOW_KNOBS->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT);
942 networkThread = true;
943 }
944
isNetworkThread()945 bool TraceEvent::isNetworkThread() {
946 return networkThread;
947 }
948
begin()949 TraceInterval& TraceInterval::begin() {
950 pairID = trace_random->randomUniqueID();
951 count = 0;
952 return *this;
953 }
954
addEvent(const char * name,uint64_t id,const char * location)955 void TraceBatch::addEvent( const char *name, uint64_t id, const char *location ) {
956 eventBatch.push_back( EventInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), name, id, location));
957 if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP )
958 dump();
959 }
960
addAttach(const char * name,uint64_t id,uint64_t to)961 void TraceBatch::addAttach( const char *name, uint64_t id, uint64_t to ) {
962 attachBatch.push_back( AttachInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), name, id, to));
963 if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP )
964 dump();
965 }
966
addBuggify(int activated,int line,std::string file)967 void TraceBatch::addBuggify( int activated, int line, std::string file ) {
968 if( g_network ) {
969 buggifyBatch.push_back( BuggifyInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), activated, line, file));
970 if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP )
971 dump();
972 } else {
973 buggifyBatch.push_back( BuggifyInfo(0, activated, line, file));
974 }
975 }
976
dump()977 void TraceBatch::dump() {
978 if (!g_traceLog.isOpen())
979 return;
980 std::string machine;
981 if(g_network->isSimulated()) {
982 NetworkAddress local = g_network->getLocalAddress();
983 machine = formatIpPort(local.ip, local.port);
984 }
985
986 for(int i = 0; i < attachBatch.size(); i++) {
987 if(g_network->isSimulated()) {
988 attachBatch[i].fields.addField("Machine", machine);
989 }
990 g_traceLog.writeEvent(attachBatch[i].fields, "", false);
991 }
992
993 for(int i = 0; i < eventBatch.size(); i++) {
994 if(g_network->isSimulated()) {
995 eventBatch[i].fields.addField("Machine", machine);
996 }
997 g_traceLog.writeEvent(eventBatch[i].fields, "", false);
998 }
999
1000 for(int i = 0; i < buggifyBatch.size(); i++) {
1001 if(g_network->isSimulated()) {
1002 buggifyBatch[i].fields.addField("Machine", machine);
1003 }
1004 g_traceLog.writeEvent(buggifyBatch[i].fields, "", false);
1005 }
1006
1007 g_traceLog.flush();
1008 eventBatch.clear();
1009 attachBatch.clear();
1010 buggifyBatch.clear();
1011 }
1012
EventInfo(double time,const char * name,uint64_t id,const char * location)1013 TraceBatch::EventInfo::EventInfo(double time, const char *name, uint64_t id, const char *location) {
1014 fields.addField("Severity", format("%d", (int)SevInfo));
1015 fields.addField("Time", format("%.6f", time));
1016 fields.addField("Type", name);
1017 fields.addField("ID", format("%016" PRIx64, id));
1018 fields.addField("Location", location);
1019 }
1020
AttachInfo(double time,const char * name,uint64_t id,uint64_t to)1021 TraceBatch::AttachInfo::AttachInfo(double time, const char *name, uint64_t id, uint64_t to) {
1022 fields.addField("Severity", format("%d", (int)SevInfo));
1023 fields.addField("Time", format("%.6f", time));
1024 fields.addField("Type", name);
1025 fields.addField("ID", format("%016" PRIx64, id));
1026 fields.addField("To", format("%016" PRIx64, to));
1027 }
1028
BuggifyInfo(double time,int activated,int line,std::string file)1029 TraceBatch::BuggifyInfo::BuggifyInfo(double time, int activated, int line, std::string file) {
1030 fields.addField("Severity", format("%d", (int)SevInfo));
1031 fields.addField("Time", format("%.6f", time));
1032 fields.addField("Type", "BuggifySection");
1033 fields.addField("Activated", format("%d", activated));
1034 fields.addField("File", std::move(file));
1035 fields.addField("Line", format("%d", line));
1036 }
1037
TraceEventFields()1038 TraceEventFields::TraceEventFields() : bytes(0) {}
1039
addField(const std::string & key,const std::string & value)1040 void TraceEventFields::addField(const std::string& key, const std::string& value) {
1041 bytes += key.size() + value.size();
1042 fields.push_back(std::make_pair(key, value));
1043 }
1044
addField(std::string && key,std::string && value)1045 void TraceEventFields::addField(std::string&& key, std::string&& value) {
1046 bytes += key.size() + value.size();
1047 fields.push_back(std::make_pair(std::move(key), std::move(value)));
1048 }
1049
size() const1050 size_t TraceEventFields::size() const {
1051 return fields.size();
1052 }
1053
sizeBytes() const1054 size_t TraceEventFields::sizeBytes() const {
1055 return bytes;
1056 }
1057
begin() const1058 TraceEventFields::FieldIterator TraceEventFields::begin() const {
1059 return fields.cbegin();
1060 }
1061
end() const1062 TraceEventFields::FieldIterator TraceEventFields::end() const {
1063 return fields.cend();
1064 }
1065
operator [](int index) const1066 const TraceEventFields::Field &TraceEventFields::operator[] (int index) const {
1067 ASSERT(index >= 0 && index < size());
1068 return fields.at(index);
1069 }
1070
tryGetValue(std::string key,std::string & outValue) const1071 bool TraceEventFields::tryGetValue(std::string key, std::string &outValue) const {
1072 for(auto itr = begin(); itr != end(); ++itr) {
1073 if(itr->first == key) {
1074 outValue = itr->second;
1075 return true;
1076 }
1077 }
1078
1079 return false;
1080 }
1081
getValue(std::string key) const1082 std::string TraceEventFields::getValue(std::string key) const {
1083 std::string value;
1084 if(tryGetValue(key, value)) {
1085 return value;
1086 }
1087 else {
1088 TraceEvent ev(SevWarn, "TraceEventFieldNotFound");
1089 if(tryGetValue("Type", value)) {
1090 ev.detail("Event", value);
1091 }
1092 ev.detail("FieldName", key);
1093
1094 throw attribute_not_found();
1095 }
1096 }
1097
1098 namespace {
parseNumericValue(std::string const & s,double & outValue,bool permissive=false)1099 void parseNumericValue(std::string const& s, double &outValue, bool permissive = false) {
1100 double d = 0;
1101 int consumed = 0;
1102 int r = sscanf(s.c_str(), "%lf%n", &d, &consumed);
1103 if (r == 1 && (consumed == s.size() || permissive)) {
1104 outValue = d;
1105 return;
1106 }
1107
1108 throw attribute_not_found();
1109 }
1110
parseNumericValue(std::string const & s,int & outValue,bool permissive=false)1111 void parseNumericValue(std::string const& s, int &outValue, bool permissive = false) {
1112 long long int iLong = 0;
1113 int consumed = 0;
1114 int r = sscanf(s.c_str(), "%lld%n", &iLong, &consumed);
1115 if (r == 1 && (consumed == s.size() || permissive)) {
1116 if (std::numeric_limits<int>::min() <= iLong && iLong <= std::numeric_limits<int>::max()) {
1117 outValue = (int)iLong; // Downcast definitely safe
1118 return;
1119 }
1120 else {
1121 throw attribute_too_large();
1122 }
1123 }
1124
1125 throw attribute_not_found();
1126 }
1127
parseNumericValue(std::string const & s,int64_t & outValue,bool permissive=false)1128 void parseNumericValue(std::string const& s, int64_t &outValue, bool permissive = false) {
1129 long long int i = 0;
1130 int consumed = 0;
1131 int r = sscanf(s.c_str(), "%lld%n", &i, &consumed);
1132 if (r == 1 && (consumed == s.size() || permissive)) {
1133 outValue = i;
1134 return;
1135 }
1136
1137 throw attribute_not_found();
1138 }
1139
1140 template<class T>
getNumericValue(TraceEventFields const & fields,std::string key,bool permissive)1141 T getNumericValue(TraceEventFields const& fields, std::string key, bool permissive) {
1142 std::string field = fields.getValue(key);
1143
1144 try {
1145 T value;
1146 parseNumericValue(field, value, permissive);
1147 return value;
1148 }
1149 catch(Error &e) {
1150 std::string type;
1151
1152 TraceEvent ev(SevWarn, "ErrorParsingNumericTraceEventField");
1153 ev.error(e);
1154 if(fields.tryGetValue("Type", type)) {
1155 ev.detail("Event", type);
1156 }
1157 ev.detail("FieldName", key);
1158 ev.detail("FieldValue", field);
1159
1160 throw;
1161 }
1162 }
1163 } // namespace
1164
getInt(std::string key,bool permissive) const1165 int TraceEventFields::getInt(std::string key, bool permissive) const {
1166 return getNumericValue<int>(*this, key, permissive);
1167 }
1168
getInt64(std::string key,bool permissive) const1169 int64_t TraceEventFields::getInt64(std::string key, bool permissive) const {
1170 return getNumericValue<int64_t>(*this, key, permissive);
1171 }
1172
getDouble(std::string key,bool permissive) const1173 double TraceEventFields::getDouble(std::string key, bool permissive) const {
1174 return getNumericValue<double>(*this, key, permissive);
1175 }
1176
toString() const1177 std::string TraceEventFields::toString() const {
1178 std::string str;
1179 bool first = true;
1180 for(auto itr = begin(); itr != end(); ++itr) {
1181 if(!first) {
1182 str += ", ";
1183 }
1184 first = false;
1185
1186 str += format("\"%s\"=\"%s\"", itr->first.c_str(), itr->second.c_str());
1187 }
1188
1189 return str;
1190 }
1191
validateField(const char * key,bool allowUnderscores)1192 bool validateField(const char *key, bool allowUnderscores) {
1193 if((key[0] < 'A' || key[0] > 'Z') && key[0] != '_') {
1194 return false;
1195 }
1196
1197 const char* underscore = strchr(key, '_');
1198 while(underscore) {
1199 if(!allowUnderscores || ((underscore[1] < 'A' || underscore[1] > 'Z') && key[0] != '_' && key[0] != '\0')) {
1200 return false;
1201 }
1202
1203 underscore = strchr(&underscore[1], '_');
1204 }
1205
1206 return true;
1207 }
1208
validateFormat() const1209 void TraceEventFields::validateFormat() const {
1210 if(g_network && g_network->isSimulated()) {
1211 for(Field field : fields) {
1212 if(!validateField(field.first.c_str(), false)) {
1213 fprintf(stderr, "Trace event detail name `%s' is invalid in:\n\t%s\n", field.first.c_str(), toString().c_str());
1214 }
1215 if(field.first == "Type" && !validateField(field.second.c_str(), true)) {
1216 fprintf(stderr, "Trace event detail Type `%s' is invalid\n", field.second.c_str());
1217 }
1218 }
1219 }
1220 }
1221