1 /*
2 * TDMetric.actor.h
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #pragma once
22
23 // When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version.
24 #if defined(NO_INTELLISENSE) && !defined(FLOW_TDMETRIC_ACTOR_G_H)
25 #define FLOW_TDMETRIC_ACTOR_G_H
26 #include "flow/TDMetric.actor.g.h"
27 #elif !defined(FLOW_TDMETRIC_ACTOR_H)
28 #define FLOW_TDMETRIC_ACTOR_H
29
30 #include "flow/flow.h"
31 #include "flow/IndexedSet.h"
32 #include "flow/network.h"
33 #include "flow/Knobs.h"
34 #include "flow/genericactors.actor.h"
35 #include "flow/CompressedInt.h"
36 #include <algorithm>
37 #include <functional>
38 #include <cmath>
39 #include "flow/actorcompiler.h" // This must be the last #include.
40
41 struct MetricNameRef {
MetricNameRefMetricNameRef42 MetricNameRef() {}
MetricNameRefMetricNameRef43 MetricNameRef(const StringRef& type, const StringRef& name, const StringRef &id)
44 : type(type), name(name), id(id) {
45 }
MetricNameRefMetricNameRef46 MetricNameRef(Arena& a, const MetricNameRef& copyFrom)
47 : type(a, copyFrom.type), name(a, copyFrom.name), id(a, copyFrom.id) {
48 }
49
50 StringRef type, name, id;
51
toStringMetricNameRef52 std::string toString() const {
53 return format("(%s,%s,%s,%s)", type.toString().c_str(), name.toString().c_str(), id.toString().c_str());
54 }
55
expectedSizeMetricNameRef56 int expectedSize() const {
57 return type.expectedSize() + name.expectedSize();
58 }
59 };
60
61 extern std::string reduceFilename(std::string const &filename);
62 inline bool operator < (const MetricNameRef& l, const MetricNameRef& r ) {
63 int cmp = l.type.compare(r.type);
64 if(cmp == 0) {
65 cmp = l.name.compare(r.name);
66 if(cmp == 0)
67 cmp = l.id.compare(r.id);
68 }
69 return cmp < 0;
70 }
71
72 inline bool operator == (const MetricNameRef& l, const MetricNameRef& r ) {
73 return l.type == r.type && l.name == r.name && l.id == r.id;
74 }
75
76 inline bool operator != (const MetricNameRef& l, const MetricNameRef& r ) {
77 return !(l == r);
78 }
79
80 struct KeyWithWriter {
81 Standalone<StringRef> key;
82 BinaryWriter writer;
83 int writerOffset;
84
keyKeyWithWriter85 KeyWithWriter( Standalone<StringRef> const& key, BinaryWriter& writer, int writerOffset = 0) : key(key), writer(std::move(writer)), writerOffset(writerOffset) {}
KeyWithWriterKeyWithWriter86 KeyWithWriter( KeyWithWriter&& r ) : key(std::move(r.key)), writer(std::move(r.writer)), writerOffset(r.writerOffset) {}
87 void operator=( KeyWithWriter&& r ) { key = std::move(r.key); writer = std::move(r.writer); writerOffset = r.writerOffset; }
88
valueKeyWithWriter89 StringRef value() {
90 return StringRef(writer.toValue().substr(writerOffset));
91 }
92 };
93
94 // This is a very minimal interface for getting metric data from the DB which is needed
95 // to support continuing existing metric data series.
96 // It's lack of generality is intentional.
97 class IMetricDB {
98 public:
~IMetricDB()99 virtual ~IMetricDB() {}
100
101 // key should be the result of calling metricKey or metricFieldKey with time = 0
102 virtual Future<Optional<Standalone<StringRef>>> getLastBlock(Standalone<StringRef> key) = 0;
103 };
104
105 // Key generator for metric keys for various things.
106 struct MetricKeyRef {
MetricKeyRefMetricKeyRef107 MetricKeyRef() : level(-1) {}
MetricKeyRefMetricKeyRef108 MetricKeyRef(Arena& a, const MetricKeyRef& copyFrom)
109 : prefix(a, copyFrom.prefix), name(a, copyFrom.name), address(a, copyFrom.address),
110 fieldName(a, copyFrom.fieldName), fieldType(a, copyFrom.fieldType), level(copyFrom.level) {
111 }
112
113 StringRef prefix;
114 MetricNameRef name;
115 StringRef address;
116 StringRef fieldName;
117 StringRef fieldType;
118 uint64_t level;
119
expectedSizeMetricKeyRef120 int expectedSize() const {
121 return prefix.expectedSize() + name.expectedSize() + address.expectedSize() + fieldName.expectedSize() + fieldType.expectedSize();
122 }
123
withFieldMetricKeyRef124 template <typename T> inline MetricKeyRef withField(const T &field) const {
125 MetricKeyRef mk(*this);
126 mk.fieldName = field.name();
127 mk.fieldType = field.typeName();
128 return mk;
129 }
130
131 const Standalone<StringRef> packLatestKey() const;
132 const Standalone<StringRef> packDataKey(int64_t time = -1) const;
133 const Standalone<StringRef> packFieldRegKey() const;
134
isFieldMetricKeyRef135 bool isField() const { return fieldName.size() > 0 && fieldType.size() > 0; }
136 void writeField(BinaryWriter &wr) const;
137 void writeMetricName(BinaryWriter &wr) const;
138 };
139
140 struct MetricUpdateBatch {
141 std::vector<KeyWithWriter> inserts;
142 std::vector<KeyWithWriter> appends;
143 std::vector<std::pair<Standalone<StringRef>,Standalone<StringRef>>> updates;
144 std::vector<std::function<Future<Void>(IMetricDB *, MetricUpdateBatch *)>> callbacks;
145
clearMetricUpdateBatch146 void clear() {
147 inserts.clear();
148 appends.clear();
149 updates.clear();
150 callbacks.clear();
151 }
152 };
153
154 template<typename T>
metricTypeName()155 inline const StringRef metricTypeName() {
156 // If this function does not compile then T is not a supported metric type
157 return T::metric_field_type();
158 }
159 #define MAKE_TYPENAME(T, S) template<> inline const StringRef metricTypeName<T>() { return LiteralStringRef(S); }
160 MAKE_TYPENAME(bool, "Bool")
161 MAKE_TYPENAME(int64_t, "Int64")
162 MAKE_TYPENAME(double, "Double")
163 MAKE_TYPENAME(Standalone<StringRef>, "String")
164 #undef MAKE_TYPENAME
165
166 struct BaseMetric;
167
168 // The collection of metrics that exist for a single process, at a single address.
169 class TDMetricCollection {
170 public:
TDMetricCollection()171 TDMetricCollection() : currentTimeBytes(0) {}
172
173 // Metric Name to reference to its instance
174 Map<Standalone<MetricNameRef>, Reference<BaseMetric>, MapPair<Standalone<MetricNameRef>, Reference<BaseMetric>>, int> metricMap;
175
176 AsyncTrigger metricAdded;
177 AsyncTrigger metricEnabled;
178 AsyncTrigger metricRegistrationChanged;
179
180 // Initialize the collection. Once this returns true, metric data can be written to a database. Note that metric data can be logged
181 // before that time, just not written to a database.
init()182 bool init() {
183 // Get and store the local address in the metric collection, but only if it is not 0.0.0.0:0
184 if( address.size() == 0 ) {
185 NetworkAddress addr = g_network->getLocalAddress();
186 if (addr.ip.isValid() && addr.port != 0) address = StringRef(addr.toString());
187 }
188 return address.size() != 0;
189 }
190
191 // Returns the TDMetrics that the calling process should use
getTDMetrics()192 static TDMetricCollection* getTDMetrics() {
193 if(g_network == nullptr)
194 return nullptr;
195 return static_cast<TDMetricCollection*>((void*) g_network->global(INetwork::enTDMetrics));
196 }
197
198 Deque<uint64_t> rollTimes;
199 int64_t currentTimeBytes;
200 Standalone<StringRef> address;
201
202 void checkRoll(uint64_t t, int64_t usedBytes);
203 bool canLog(int level);
204 };
205
206 struct MetricData {
207 uint64_t start;
208 uint64_t rollTime;
209 uint64_t appendStart;
210 BinaryWriter writer;
211
212 explicit MetricData(uint64_t appendStart = 0) :
writerMetricData213 writer(AssumeVersion(currentProtocolVersion)),
214 start(0),
215 rollTime(std::numeric_limits<uint64_t>::max()),
216 appendStart(appendStart) {
217 }
218
MetricDataMetricData219 MetricData( MetricData&& r ) BOOST_NOEXCEPT :
220 start(r.start),
221 rollTime(r.rollTime),
222 appendStart(r.appendStart),
223 writer(std::move(r.writer)) {
224 }
225
226 void operator=( MetricData&& r ) BOOST_NOEXCEPT {
227 start = r.start; rollTime = r.rollTime; appendStart = r.appendStart; writer = std::move(r.writer);
228 }
229
230 std::string toString();
231 };
232
233 // Some common methods to reduce code redundancy across different metric definitions
234 template<typename T, typename _ValueType = Void>
235 struct MetricUtil {
236 typedef _ValueType ValueType;
237 typedef T MetricType;
238
239 // Looks up a metric by name and id and returns a reference to it if it exists.
240 // Empty names will not be looked up.
241 // If create is true then a metric will be created with the given initial value if one could not be found to return.
242 // If a metric is created and name is not empty then the metric will be placed in the collection.
243 static Reference<T> getOrCreateInstance(StringRef const& name, StringRef const &id = StringRef(), bool create = false, ValueType initial = ValueType()) {
244 Reference<T> m;
245 TDMetricCollection *collection = TDMetricCollection::getTDMetrics();
246
247 // If there is a metric collect and this metric has a name then look it up in the collection
248 bool useMap = collection != nullptr && name.size() > 0;
249 MetricNameRef mname;
250
251 if(useMap) {
252 mname = MetricNameRef(T::metricType, name, id);
253 auto mi = collection->metricMap.find(mname);
254 if(mi != collection->metricMap.end()) {
255 m = mi->value.castTo<T>();
256 }
257 }
258
259 // If we don't have a valid metric reference yet and the create flag was set then create one and possibly put it in the map
260 if(!m && create) {
261 // Metric not found in collection but create is set then create it in the map
262 m = Reference<T>(new T(mname, initial));
263 if(useMap) {
264 collection->metricMap[mname] = m.template castTo<BaseMetric>();
265 collection->metricAdded.trigger();
266 }
267 }
268
269 return m;
270 }
271
272 // Lookup the T metric by name and return its value (or nullptr if it doesn't exist)
lookupMetricMetricUtil273 static T * lookupMetric(MetricNameRef const &name) {
274 auto it = T::metricMap().find(name);
275 if(it != T::metricMap().end())
276 return it->value;
277 return nullptr;
278 }
279 };
280
281 // index_sequence implementation since VS2013 doesn't have it yet
282 template <size_t... Ints> class index_sequence {
283 public:
size()284 static size_t size() { return sizeof...(Ints); }
285 };
286
287 template <size_t Start, typename Indices, size_t End>
288 struct make_index_sequence_impl;
289
290 template <size_t Start, size_t... Indices, size_t End>
291 struct make_index_sequence_impl<Start, index_sequence<Indices...>, End> {
292 typedef typename make_index_sequence_impl<
293 Start + 1, index_sequence<Indices..., Start>, End>::type type;
294 };
295
296 template <size_t End, size_t... Indices>
297 struct make_index_sequence_impl<End, index_sequence<Indices...>, End> {
298 typedef index_sequence<Indices...> type;
299 };
300
301 // The code that actually implements tuple_map
302 template <size_t I, typename F, typename... Tuples>
303 auto tuple_zip_invoke(F f, const Tuples &... ts) -> decltype( f(std::get<I>(ts)...) ) {
304 return f(std::get<I>(ts)...);
305 }
306
307 template <typename F, size_t... Is, typename... Tuples>
308 auto tuple_map_impl(F f, index_sequence<Is...>, const Tuples &... ts) -> decltype( std::make_tuple(tuple_zip_invoke<Is>(f, ts...)...) ) {
309 return std::make_tuple(tuple_zip_invoke<Is>(f, ts...)...);
310 }
311
312 // tuple_map( f(a,b), (a1,a2,a3), (b1,b2,b3) ) = (f(a1,b1), f(a2,b2), f(a3,b3))
313 template <typename F, typename Tuple, typename... Tuples>
314 auto tuple_map(F f, const Tuple &t, const Tuples &... ts) -> decltype( tuple_map_impl(f, typename make_index_sequence_impl<0, index_sequence<>, std::tuple_size<Tuple>::value>::type(), t, ts...) ) {
315 return tuple_map_impl(f, typename make_index_sequence_impl<0, index_sequence<>, std::tuple_size<Tuple>::value>::type(), t, ts...);
316 }
317
318 template <class T>
319 struct Descriptor {
320 #ifndef NO_INTELLISENSE
321 using fields = std::tuple<>;
322 typedef make_index_sequence_impl<0, index_sequence<>, std::tuple_size<fields>::value>::type field_indexes;
323
324 static StringRef typeName() {{ return LiteralStringRef(""); }}
325 #endif
326 };
327
328 // FieldHeader is a serializable (FIXED SIZE!) and updatable Header type for Metric field levels.
329 // Update is via += with either a T or another FieldHeader
330 // Default implementation is sufficient for ints and doubles
331 template<typename T>
332 struct FieldHeader {
333 FieldHeader() : version(1), count(0), sum(0) {}
334 uint8_t version;
335 int64_t count;
336 // sum is a T if T is arithmetic, otherwise it's an int64_t
337 typename std::conditional<std::is_floating_point<T>::value, double, int64_t>::type sum;
338
339 void update(FieldHeader const &h) {
340 count += h.count;
341 sum += h.sum;
342 }
343 void update(T const &v) {
344 ++count;
345 sum += v;
346 }
347 template<class Ar> void serialize(Ar &ar) {
348 serializer(ar, version);
349 ASSERT(version == 1);
350 serializer(ar, count, sum);
351 }
352 };
353
354 template <> inline void FieldHeader<Standalone<StringRef>>::update(Standalone<StringRef> const &v) {
355 ++count;
356 sum += v.size();
357 }
358
359 // FieldValueBlockEncoding is a class for reading and writing encoded field values to and from field
360 // value data blocks. Note that an implementation can be stateful.
361 // Proper usage requires that a single Encoding instance is used to either write all field values to a metric
362 // data block or to read all field values from a metric value block. This usage pattern enables enables
363 // encoding and decoding values as deltas from previous values.
364 //
365 // The default implementation works for ints and writes delta from the previous value.
366 template <typename T>
367 struct FieldValueBlockEncoding {
368 FieldValueBlockEncoding() : prev(0) {}
369 inline void write(BinaryWriter &w, T v) {
370 w << CompressedInt<T>(v - prev);
371 prev = v;
372 }
373 T read(BinaryReader &r) {
374 CompressedInt<T> v;
375 r >> v;
376 prev += v.value;
377 return prev;
378 }
379 T prev;
380 };
381
382 template <>
383 struct FieldValueBlockEncoding<double> {
384 inline void write(BinaryWriter &w, double v) {
385 w << v;
386 }
387 double read(BinaryReader &r) {
388 double v;
389 r >> v;
390 return v;
391 }
392 };
393
394 template <>
395 struct FieldValueBlockEncoding<bool> {
396 inline void write(BinaryWriter &w, bool v) {
397 w.serializeBytes( v ? LiteralStringRef("\x01") : LiteralStringRef("\x00") );
398 }
399 bool read(BinaryReader &r) {
400 uint8_t *v = (uint8_t *)r.readBytes(sizeof(uint8_t));
401 return *v != 0;
402 }
403 };
404
405 // Encoder for strings, writes deltas
406 template <>
407 struct FieldValueBlockEncoding<Standalone<StringRef>> {
408 inline void write(BinaryWriter &w, Standalone<StringRef> const &v) {
409 int reuse = 0;
410 int stop = std::min(v.size(), prev.size());
411 while(reuse < stop && v[reuse] == prev[reuse])
412 ++reuse;
413 w << CompressedInt<int>(reuse) << CompressedInt<int>(v.size() - reuse);
414 if(v.size() > reuse)
415 w.serializeBytes(v.substr(reuse));
416 prev = v;
417 }
418 Standalone<StringRef> read(BinaryReader &r) {
419 CompressedInt<int> reuse;
420 CompressedInt<int> extra;
421 r >> reuse >> extra;
422 ASSERT(reuse.value >= 0 && extra.value >= 0 && reuse.value <= prev.size());
423 Standalone<StringRef> v = makeString(reuse.value + extra.value);
424 memcpy(mutateString(v), prev.begin(), reuse.value);
425 memcpy(mutateString(v) + reuse.value, r.readBytes(extra.value), extra.value);
426 prev = v;
427 return v;
428 }
429 // Using a Standalone<StringRef> for prev is efficient for writing but not great for reading.
430 Standalone<StringRef> prev;
431 };
432
433
434 // Field level for value type of T using header type of Header. Default header type is the default FieldHeader implementation for type T.
435 template <class T, class Header = FieldHeader<T>, class Encoder = FieldValueBlockEncoding<T>>
436 struct FieldLevel {
437
438 Deque<MetricData> metrics;
439 int64_t appendUsed;
440 Header header;
441
442 // The previous header and the last timestamp at which an out going MetricData block requires header patching
443 Optional<Header> previousHeader;
444 uint64_t lastTimeRequiringHeaderPatch;
445
446 Encoder enc;
447
448 explicit FieldLevel() : appendUsed(0) {
449 metrics.emplace_back(MetricData());
450 metrics.back().writer << header;
451 }
452
453 FieldLevel(FieldLevel &&f)
454 : metrics(std::move(f.metrics)), appendUsed(f.appendUsed), enc(f.enc), header(f.header),
455 previousHeader(f.previousHeader), lastTimeRequiringHeaderPatch(f.lastTimeRequiringHeaderPatch) {
456 }
457
458 // update Header, use Encoder to write T v
459 void log( T v, uint64_t t, bool& overflow, int64_t& bytes ) {
460 int lastLength = metrics.back().writer.getLength();
461 if( metrics.back().start == 0 )
462 metrics.back().start = t;
463
464 header.update(v);
465 enc.write(metrics.back().writer, v);
466
467 bytes += metrics.back().writer.getLength() - lastLength;
468 if(lastLength + appendUsed > FLOW_KNOBS->MAX_METRIC_SIZE)
469 overflow = true;
470 }
471
472 void nextKey( uint64_t t ) {
473 // If nothing has actually been written to the current block, don't add a new block,
474 // just modify this one if needed so that the next log call will set the ts for this block.
475 auto &m = metrics.back();
476 if(m.start == 0 && m.appendStart == 0)
477 return;
478
479 // This block would have appended but had no data so just reset it to a non-append block instead of adding a new one
480 if(m.appendStart != 0 && m.writer.getLength() == 0) {
481 m.appendStart = 0;
482 m.writer << header;
483 enc = Encoder();
484 return;
485 }
486
487 metrics.back().rollTime = t;
488 metrics.emplace_back(MetricData());
489 metrics.back().writer << header;
490 enc = Encoder();
491 appendUsed = 0;
492 }
493
494 void rollMetric( uint64_t t ) {
495 ASSERT(metrics.size());
496
497 if(metrics.back().start) {
498 metrics.back().rollTime = t;
499 appendUsed += metrics.back().writer.getLength();
500 if(metrics.back().appendStart)
501 metrics.emplace_back(MetricData(metrics.back().appendStart));
502 else
503 metrics.emplace_back(MetricData(metrics.back().start));
504 }
505 }
506
507 // Calculate header as of the end of a value block
508 static Header calculateHeader(StringRef block) {
509 BinaryReader r(block, AssumeVersion(currentProtocolVersion));
510 Header h;
511 r >> h;
512 Encoder dec;
513 while(!r.empty()) {
514 T v = dec.read(r);
515 h.update(v);
516 }
517 return h;
518 }
519
520 // Read header at position, update it with previousHeader, overwrite old header with new header.
521 static void updateSerializedHeader(StringRef buf, const Header &patch) {
522 BinaryReader r(buf, AssumeVersion(currentProtocolVersion));
523 Header h;
524 r >> h;
525 h.update(patch);
526 OverWriter w(mutateString(buf), buf.size(), AssumeVersion(currentProtocolVersion));
527 w << h;
528 }
529
530 // Flushes data blocks in metrics to batch, optionally patching headers if a header is given
531 void flushUpdates(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
532 while(metrics.size()) {
533 auto& data = metrics.front();
534
535 if(data.start != 0 && data.rollTime <= rollTime) {
536 // If this data is to be appended, write it to the batch now.
537 if( data.appendStart ) {
538 batch.appends.push_back(KeyWithWriter(mk.packDataKey(data.appendStart), data.writer));
539 } else {
540 // Otherwise, insert but first, patch the header if this block is old enough
541 if(data.rollTime <= lastTimeRequiringHeaderPatch) {
542 ASSERT(previousHeader.present());
543 FieldLevel<T>::updateSerializedHeader(data.writer.toValue(), previousHeader.get());
544 }
545
546 batch.inserts.push_back(KeyWithWriter(mk.packDataKey(data.start), data.writer));
547 }
548
549 if(metrics.size() == 1) {
550 rollMetric(data.rollTime);
551 metrics.pop_front();
552 break;
553 }
554
555 metrics.pop_front();
556 }
557 else
558 break;
559 }
560 }
561
562 ACTOR static Future<Void> updatePreviousHeader(FieldLevel *self, IMetricDB *db, Standalone<MetricKeyRef> mk, uint64_t rollTime, MetricUpdateBatch *batch) {
563
564 Optional<Standalone<StringRef>> block = wait(db->getLastBlock(mk.packDataKey(-1)));
565
566 // If the block is present, use it
567 if(block.present()) {
568 // Calculate the previous data's final header value
569 Header oldHeader = calculateHeader(block.get());
570
571 // Set the previous header in self to this header for us in patching outgoing blocks
572 self->previousHeader = oldHeader;
573
574 // Update the header in self so the next new block created will be current
575 self->header.update(oldHeader);
576
577 // Any blocks already in the metrics queue will need to be patched at the time that they are
578 // flushed to the DB (which isn't necessarity part of the current flush) so set the last time
579 // that requires a patch to the time of the last MetricData in the queue
580 self->lastTimeRequiringHeaderPatch = self->metrics.back().rollTime;
581 }
582 else {
583 // Otherwise, there is no previous header so no headers need to be updated at all ever.
584 // Set the previous header to an empty header so that flush() sees that this process
585 // has already finished, and set lastTimeRequiringHeaderPatch to 0 since no blocks ever need to be patched.
586 self->previousHeader = Header();
587 self->lastTimeRequiringHeaderPatch = 0;
588 }
589
590 // Now flush the level data up to the rollTime argument and patch anything older than lastTimeRequiringHeaderPatch
591 self->flushUpdates(mk, rollTime, *batch);
592
593 return Void();
594 }
595
596 // Flush this level's data to the output batch.
597 // This function must NOT be called again until any callbacks added to batch have been completed.
598 void flush(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
599 // Don't do anything if there is no data in the queue to flush.
600 if(metrics.empty() || metrics.front().start == 0)
601 return;
602
603 // If the previous header is present then just call flushUpdates now.
604 if(previousHeader.present())
605 return flushUpdates(mk, rollTime, batch);
606
607 Standalone<MetricKeyRef> mkCopy = mk;
608
609 // Previous header is not present so queue a callback which will update it
610 batch.callbacks.push_back([=](IMetricDB *db, MetricUpdateBatch *batch) mutable -> Future<Void> {
611 return updatePreviousHeader(this, db, mkCopy, rollTime, batch);
612 });
613
614 }
615 };
616
617 // A field Description to be used for continuous metrics, whose field name and type should never be accessed
618 struct NullDescriptor {
619 static StringRef name() { return StringRef(); }
620 };
621
622 // Descriptor must have the methods name() and typeName(). They can be either static or member functions (such as for runtime configurability).
623 // Descriptor is inherited so that syntatically Descriptor::fn() works in either case and so that an empty Descriptor with static methods
624 // will take up 0 space. EventField() accepts an optional Descriptor instance.
625 template <class T, class Descriptor = NullDescriptor, class FieldLevelType = FieldLevel<T>>
626 struct EventField : public Descriptor {
627 std::vector<FieldLevelType> levels;
628
629 EventField( EventField&& r ) BOOST_NOEXCEPT : Descriptor(r), levels(std::move(r.levels)) {}
630
631 void operator=( EventField&& r ) BOOST_NOEXCEPT {
632 levels = std::move(r.levels);
633 }
634
635 EventField(Descriptor d = Descriptor()) : Descriptor(d) {
636 }
637
638 static StringRef typeName() { return metricTypeName<T>(); }
639
640 void init() {
641 if(levels.size() != FLOW_KNOBS->MAX_METRIC_LEVEL) {
642 levels.clear();
643 levels.resize(FLOW_KNOBS->MAX_METRIC_LEVEL);
644 }
645 }
646
647 void log( T v, uint64_t t, int64_t l, bool& overflow, int64_t& bytes ) {
648 return levels[l].log(v, t, overflow, bytes);
649 }
650
651 void nextKey( uint64_t t, int level ) {
652 levels[level].nextKey(t);
653 }
654
655 void nextKeyAllLevels( uint64_t t ) {
656 for(int64_t i = 0; i < FLOW_KNOBS->MAX_METRIC_LEVEL; i++)
657 nextKey(t, i);
658 }
659
660 void rollMetric( uint64_t t ) {
661 for(int i = 0; i < levels.size(); i++) {
662 levels[i].rollMetric(t);
663 }
664 }
665
666 void flushField(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
667 MetricKeyRef fk = mk.withField(*this);
668 for(int j = 0; j < levels.size(); ++j) {
669 fk.level = j;
670 levels[j].flush(fk, rollTime, batch);
671 }
672 }
673
674 // Writes and Event metric field registration key
675 void registerField( const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
676 fieldKeys.push_back(mk.withField(*this).packFieldRegKey());
677 }
678 };
679
680 struct MakeEventField {
681 template <class Descriptor>
682 EventField<typename Descriptor::type, Descriptor> operator() (Descriptor) { return EventField<typename Descriptor::type, Descriptor>(); }
683 };
684
685 struct TimeDescriptor {
686 static StringRef name() { return LiteralStringRef("Time"); }
687 };
688
689 struct BaseMetric {
690 BaseMetric(MetricNameRef const &name) : metricName(name), pCollection(nullptr), registered(false), enabled(false) {
691 setConfig(false);
692 }
693 virtual ~BaseMetric() {
694 }
695
696 virtual void addref() = 0;
697 virtual void delref() = 0;
698
699 virtual void rollMetric(uint64_t t) = 0;
700
701 virtual void flushData(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) = 0;
702 virtual void registerFields(const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys) {};
703
704 // Set the metric's config. An assert will fail if the metric is enabled before the metrics collection is available.
705 void setConfig(bool enable, int minLogLevel = 0) {
706 bool wasEnabled = enabled;
707 enabled = enable;
708 minLevel = minLogLevel;
709
710 if(enable && pCollection == nullptr) {
711 pCollection = TDMetricCollection::getTDMetrics();
712 ASSERT(pCollection != nullptr);
713 }
714
715 if(wasEnabled != enable) {
716 if(enabled) {
717 onEnable();
718 pCollection->metricEnabled.trigger();
719 }
720 else
721 onDisable();
722 }
723 }
724
725 // Callbacks for when metric is Enabled or Disabled.
726 // Metrics should verify their underlying storage on Enable because they could have been initially created
727 // at a time when the knobs were not initialized.
728 virtual void onEnable() = 0;
729 virtual void onDisable() {};
730
731 // Combines checking this metric's configured minimum level and any collection-wide throttling
732 // This should only be called after it is determined that a metric is enabled.
733 bool canLog(int level) {
734 return level >= minLevel && pCollection->canLog(level);
735 }
736
737 Standalone<MetricNameRef> metricName;
738
739 bool enabled; // The metric is currently logging data
740 int minLevel; // The minimum level that will be logged.
741
742 // All metrics need a pointer to their collection for performance reasons - every time a data point is logged
743 // canLog must be called which uses the collection's canLog to decide based on the metric write queue.
744 TDMetricCollection *pCollection;
745
746 // The metric has been registered in its current form (some metrics can change and require re-reg)
747 bool registered;
748 };
749
750 struct BaseEventMetric : BaseMetric {
751
752 BaseEventMetric(MetricNameRef const &name) : BaseMetric(name) {
753 }
754
755 // Needed for MetricUtil
756 static const StringRef metricType;
757 Void getValue() const {
758 return Void();
759 }
760 virtual ~BaseEventMetric() {}
761
762 // Every metric should have a set method for its underlying type in order for MetricUtil::getOrCreateInstance
763 // to initialize it. In the case of event metrics there is no underlying type so the underlying type
764 // is Void and set does nothing.
765 void set(Void const &val) {}
766
767 virtual StringRef getTypeName() = 0;
768 };
769
770 template <class E>
771 struct EventMetric : E, ReferenceCounted<EventMetric<E>>, MetricUtil<EventMetric<E>>, BaseEventMetric {
772 EventField<int64_t, TimeDescriptor> time;
773 bool latestRecorded;
774 decltype( tuple_map( MakeEventField(), typename Descriptor<E>::fields() ) ) values;
775
776 virtual void addref() { ReferenceCounted<EventMetric<E>>::addref(); }
777 virtual void delref() { ReferenceCounted<EventMetric<E>>::delref(); }
778
779 EventMetric( MetricNameRef const &name, Void) : BaseEventMetric(name), latestRecorded(false) {
780 }
781
782 virtual ~EventMetric() {
783 }
784
785 virtual StringRef getTypeName() { return Descriptor<E>::typeName(); }
786
787 void onEnable() {
788 // Must initialize fields, previously knobs may not have been set.
789 time.init();
790 initFields( typename Descriptor<E>::field_indexes());
791 }
792
793 // Log the event.
794 // Returns the time that was logged for the event so that it can be passed to other events that need to be time-sync'd.
795 // NOTE: Do NOT use the same time for two consecutive loggings of the SAME event. This *could* cause there to be metric data
796 // blocks such that the last timestamp of one block is equal to the first timestamp of the next, which means if a search is done
797 // for the exact timestamp then the first event will not be found.
798 uint64_t log(uint64_t explicitTime = 0) {
799 if(!enabled)
800 return 0;
801
802 uint64_t t = explicitTime ? explicitTime : timer_int();
803 double x = g_random->random01();
804
805 int64_t l = 0;
806 if (x == 0.0)
807 l = FLOW_KNOBS->MAX_METRIC_LEVEL-1;
808 else
809 l = std::min(FLOW_KNOBS->MAX_METRIC_LEVEL-1, (int64_t)(::log(1.0/x) / FLOW_KNOBS->METRIC_LEVEL_DIVISOR));
810
811 if(!canLog(l))
812 return 0;
813
814 bool overflow = false;
815 int64_t bytes = 0;
816 time.log(t, t, l, overflow, bytes);
817 logFields( typename Descriptor<E>::field_indexes(), t, l, overflow, bytes );
818 if(overflow) {
819 time.nextKey(t, l);
820 nextKeys(typename Descriptor<E>::field_indexes(), t, l);
821 }
822 latestRecorded = false;
823 return t;
824 }
825
826 template <size_t... Is>
827 void logFields(index_sequence<Is...>, uint64_t t, int64_t l, bool& overflow, int64_t& bytes) {
828 #ifdef NO_INTELLISENSE
829 auto _ = {
830 (std::get<Is>(values).log( std::tuple_element<Is, typename Descriptor<E>::fields>::type::get( static_cast<E&>(*this) ), t, l, overflow, bytes ), Void())...
831 };
832 #endif
833 }
834
835 template <size_t... Is>
836 void initFields(index_sequence<Is...>) {
837 #ifdef NO_INTELLISENSE
838 auto _ = {
839 (std::get<Is>(values).init(), Void())...
840 };
841 #endif
842 }
843
844 template <size_t... Is>
845 void nextKeys(index_sequence<Is...>, uint64_t t, int64_t l ) {
846 #ifdef NO_INTELLISENSE
847 auto _ = {
848 (std::get<Is>(values).nextKey(t, l),Void())...
849 };
850 #endif
851 }
852
853 virtual void flushData(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
854 time.flushField( mk, rollTime, batch );
855 flushFields( typename Descriptor<E>::field_indexes(), mk, rollTime, batch );
856 if(!latestRecorded) {
857 batch.updates.push_back(std::make_pair(mk.packLatestKey(), StringRef()));
858 latestRecorded = true;
859 }
860 }
861
862 template <size_t... Is>
863 void flushFields(index_sequence<Is...>, MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch ) {
864 #ifdef NO_INTELLISENSE
865 auto _ = {
866 (std::get<Is>(values).flushField( mk, rollTime, batch ),Void())...
867 };
868 #endif
869 }
870
871 virtual void rollMetric( uint64_t t ) {
872 time.rollMetric(t);
873 rollFields( typename Descriptor<E>::field_indexes(), t );
874 }
875
876 template <size_t... Is>
877 void rollFields(index_sequence<Is...>, uint64_t t ) {
878 #ifdef NO_INTELLISENSE
879 auto _ = {
880 (std::get<Is>(values).rollMetric( t ),Void())...
881 };
882 #endif
883 }
884
885 virtual void registerFields( MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
886 time.registerField( mk, fieldKeys );
887 registerFields( typename Descriptor<E>::field_indexes(), mk, fieldKeys );
888 }
889
890 template <size_t... Is>
891 void registerFields(index_sequence<Is...>, const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
892 #ifdef NO_INTELLISENSE
893 auto _ = {
894 (std::get<Is>(values).registerField( mk, fieldKeys ),Void())...
895 };
896 #endif
897 }
898 protected:
899 bool it;
900 };
901
902 // A field Descriptor compatible with EventField but with name set at runtime
903 struct DynamicDescriptor {
904 DynamicDescriptor(const char *name)
905 : _name(StringRef((uint8_t *)name, strlen(name))) {}
906 StringRef name() const { return _name; }
907
908 private:
909 const Standalone<StringRef> _name;
910 };
911
912 template<typename T>
913 struct DynamicField;
914
915 struct DynamicFieldBase {
916 virtual ~DynamicFieldBase() {}
917
918 virtual StringRef fieldName() = 0;
919 virtual const StringRef getDerivedTypeName() = 0;
920 virtual void init() = 0;
921 virtual void clear() = 0;
922 virtual void log(uint64_t t, int64_t l, bool& overflow, int64_t& bytes ) = 0;
923 virtual void nextKey( uint64_t t, int level ) = 0;
924 virtual void nextKeyAllLevels( uint64_t t) = 0;
925 virtual void rollMetric( uint64_t t ) = 0;
926 virtual void flushField( MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) = 0;
927 virtual void registerField( MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys ) = 0;
928
929 // Set the current value of this field from the value of another
930 virtual void setValueFrom(DynamicFieldBase *src, StringRef eventType) = 0;
931
932 // Create a new field of the same type and with the same current value as this one and with the given name
933 virtual DynamicFieldBase * createNewWithValue(const char *name) = 0;
934
935 // This does a fairly cheap and "safe" downcast without using dynamic_cast / RTTI by checking that the pointer value
936 // of the const char * type string is the same as getDerivedTypeName for this object.
937 template<typename T> DynamicField<T> * safe_downcast(StringRef eventType) {
938 if(getDerivedTypeName() == metricTypeName<T>())
939 return (DynamicField<T> *)this;
940
941 TraceEvent(SevWarnAlways, "ScopeEventFieldTypeMismatch")
942 .detail("EventType", eventType.toString())
943 .detail("FieldName", fieldName().toString())
944 .detail("OldType", getDerivedTypeName().toString())
945 .detail("NewType", metricTypeName<T>().toString());
946 return NULL;
947 }
948 };
949
950 template<typename T>
951 struct DynamicField : public DynamicFieldBase, EventField<T, DynamicDescriptor> {
952 typedef EventField<T, DynamicDescriptor> EventFieldType;
953 DynamicField(const char *name) : DynamicFieldBase(), EventFieldType(DynamicDescriptor(name)), value(T()) {}
954 virtual ~DynamicField() {}
955
956 StringRef fieldName() { return EventFieldType::name(); }
957
958 // Get the field's datatype, this is used as a form of RTTI by DynamicFieldBase::safe_downcast()
959 const StringRef getDerivedTypeName() { return metricTypeName<T>(); }
960
961 // Pure virtual implementations
962 void clear() { value = T(); }
963
964 void log(uint64_t t, int64_t l, bool& overflow, int64_t& bytes) {
965 return EventFieldType::log(value, t, l, overflow, bytes);
966 }
967
968 // Redirects to EventFieldType methods
969 void nextKey( uint64_t t, int level ) {
970 return EventFieldType::nextKey(t, level);
971 }
972 void nextKeyAllLevels( uint64_t t) {
973 return EventFieldType::nextKeyAllLevels(t);
974 }
975 void rollMetric( uint64_t t ) {
976 return EventFieldType::rollMetric(t);
977 }
978 void flushField(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
979 return EventFieldType::flushField(mk, rollTime, batch);
980 }
981 void registerField(MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys) {
982 return EventFieldType::registerField(mk, fieldKeys);
983 }
984 void init() {
985 return EventFieldType::init();
986 }
987
988 // Set this field's value to the value of another field of exactly the same type.
989 void setValueFrom(DynamicFieldBase *src, StringRef eventType) {
990 DynamicField<T> *s = src->safe_downcast<T>(eventType);
991 if(s != NULL)
992 set(s->value);
993 else
994 clear(); // Not really necessary with proper use but just in case it is better to clear than use an old value.
995 }
996
997 DynamicFieldBase * createNewWithValue(const char *name) {
998 DynamicField<T> *n = new DynamicField<T>(name);
999 n->set(value);
1000 return n;
1001 }
1002
1003 // Non virtuals
1004 void set(T val) { value = val; }
1005
1006 private:
1007 T value;
1008 };
1009
1010 // A DynamicEventMetric is an EventMetric whose field set can be modified at runtime.
1011 struct DynamicEventMetric : ReferenceCounted<DynamicEventMetric>, MetricUtil<DynamicEventMetric>, BaseEventMetric {
1012 private:
1013 EventField<int64_t, TimeDescriptor> time;
1014 bool latestRecorded;
1015
1016 // TODO: A Standalone key type isn't ideal because on lookups a ref will be made Standalone just for the search
1017 // All fields that are set with setField will be in fields.
1018 std::map<Standalone<StringRef>, DynamicFieldBase *> fields;
1019
1020 // Set of fields not yet registered
1021 std::set<Standalone<StringRef> > fieldsToRegister;
1022
1023 // Whether or not new fields have been added since the last logging. fieldsToRegister can't
1024 // be used for this because registration is independent of actually logging data.
1025 bool newFields;
1026
1027 void newFieldAdded(Standalone<StringRef> const &fname) {
1028 fieldsToRegister.insert(fname); // So that this field will be registered when asked by the metrics logger actor later
1029 newFields = true; // So that log() will know that there is a new field
1030
1031 // Registration has now changed so set registered to false and trigger a reg change event if possible
1032 registered = false;
1033 if(pCollection != nullptr)
1034 pCollection->metricRegistrationChanged.trigger();
1035 }
1036
1037 public:
1038 DynamicEventMetric(MetricNameRef const &name, Void = Void());
1039 ~DynamicEventMetric();
1040
1041 virtual void addref() { ReferenceCounted<DynamicEventMetric>::addref(); }
1042 virtual void delref() { ReferenceCounted<DynamicEventMetric>::delref(); }
1043
1044 void onEnable() {
1045 // Must initialize fields, previously knobs may not have been set.
1046 // Note that future fields will be okay because the field constructor will init and the knobs will be set.
1047 time.init();
1048 for(auto f : fields)
1049 f.second->init();
1050 }
1051
1052 // Set (or create) a new field in the event
1053 template<typename ValueType>
1054 void setField(const char *fieldName, const ValueType &value) {
1055 StringRef fname((uint8_t *)fieldName, strlen(fieldName));
1056 DynamicFieldBase *&p = fields[fname];
1057 if (p != NULL) {
1058 // FIXME: This will break for DynamicEventMetric instances that are reused, such as use cases outside
1059 // of TraceEvents. Currently there are none in the code, and there may never any be but if you're here
1060 // because you reused a DynamicEventMetric and got the error below then this issue must be fixed. One
1061 // possible solution is to have a flag in DynamicEventMetric which enables this check so that
1062 // TraceEvent can preserve this behavior.
1063 TraceEvent(SevError, "DuplicateTraceProperty").detail("Property", fieldName).backtrace();
1064 if (g_network->isSimulated()) ASSERT(false);
1065 }
1066 p = new DynamicField<ValueType>(fieldName);
1067 if(pCollection != nullptr)
1068 p->init();
1069 newFieldAdded(fname);
1070
1071 // This will return NULL if the datatype is wrong.
1072 DynamicField<ValueType> *f = p->safe_downcast<ValueType>(getTypeName());
1073 // Only set the field value if the type is correct.
1074 // Another option here is to redefine the field to the new type and flush (roll) the existing field but that would create many keys
1075 // with small values in the db if two frequent events keep tug-of-war'ing the types back and forth.
1076 if(f != NULL)
1077 f->set(value);
1078 else
1079 p->clear(); // Not really necessary with proper use but just in case it is better to clear than use an old value.
1080 }
1081
1082 // This provides a way to first set fields in a temporary DynamicEventMetric and then push all of those field values
1083 // into another DynamicEventMetric (which is actually logging somewhere) and log the event.
1084 uint64_t setFieldsAndLogFrom(DynamicEventMetric *source, uint64_t explicitTime = 0) {
1085 for(auto f : source->fields)
1086 {
1087 DynamicFieldBase *&p = fields[f.first];
1088 if(p == NULL) {
1089 p = f.second->createNewWithValue(f.first.toString().c_str());
1090 if(pCollection != nullptr)
1091 p->init();
1092 newFieldAdded(f.first);
1093 }
1094 else
1095 p->setValueFrom(f.second, getTypeName());
1096 }
1097 return log(explicitTime);
1098 }
1099
1100 StringRef getTypeName() { return metricName.name; }
1101
1102 // Set all of the fields to their default values.
1103 void clearFields() {
1104 for(auto f : fields)
1105 f.second->clear();
1106 }
1107
1108 uint64_t log(uint64_t explicitTime = 0);
1109
1110 // Virtual function implementations
1111 void flushData(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch);
1112 void rollMetric( uint64_t t );
1113 void registerFields(MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys);
1114 };
1115
1116 // Continuous metrics are a single-field metric using an EventField<TimeAndValue<T>>
1117 template <typename T>
1118 struct TimeAndValue {
1119 TimeAndValue() : time(0), value() {}
1120 int64_t time;
1121 T value;
1122
1123 // The metric field type for TimeAndValue is just the Value type.
1124 static inline const StringRef metric_field_type() { return metricTypeName<T>(); }
1125 };
1126
1127 // FieldHeader for continuous metrics, works for T = int, double, bool
1128 template <typename T>
1129 struct FieldHeader<TimeAndValue<T>> {
1130 FieldHeader() : version(1), count(0), area(0), previous_time(0) {}
1131 uint8_t version;
1132 int64_t count;
1133 // If T is a floating point type then area is a double, otherwise it's an int64_t
1134 typename std::conditional<std::is_floating_point<T>::value, double, int64_t>::type area;
1135 int64_t previous_time;
1136
1137 void update(FieldHeader const &h) {
1138 count += h.count;
1139 area += h.area;
1140 }
1141 void update(TimeAndValue<T> const &v) {
1142 ++count;
1143 if(previous_time > 0)
1144 area += v.value * (v.time - previous_time);
1145 previous_time = v.time;
1146 }
1147 template<class Ar> void serialize(Ar &ar) {
1148 serializer(ar, version);
1149 ASSERT(version == 1);
1150 serializer(ar, count, area);
1151 }
1152 };
1153
1154 template <> inline void FieldHeader<TimeAndValue<Standalone<StringRef>>>::update(TimeAndValue<Standalone<StringRef>> const &v) {
1155 ++count;
1156 area += v.value.size();
1157 }
1158
1159 // ValueBlock encoder/decoder for continuous metrics which have a type of TimeAndValue<T>
1160 // Uses encodings for int64_t and T and encodes (time, value, [time, value]...)
1161 template <typename T>
1162 struct FieldValueBlockEncoding<TimeAndValue<T>> {
1163 FieldValueBlockEncoding() : time_encoding(), value_encoding() {}
1164 inline void write(BinaryWriter &w, TimeAndValue<T> const &v) {
1165 time_encoding.write(w, v.time);
1166 value_encoding.write(w, v.value);
1167 }
1168 TimeAndValue<T> read(BinaryReader &r) {
1169 TimeAndValue<T> result;
1170 result.time = time_encoding.read(r);
1171 result.value = value_encoding.read(r);
1172 return result;
1173 }
1174 FieldValueBlockEncoding<int64_t> time_encoding;
1175 FieldValueBlockEncoding<T> value_encoding;
1176 };
1177
1178 // ValueBlock encoder/decoder specialization for continuous bool metrics because they are encoded
1179 // more efficiently than encoding the time and bool types separately.
1180 // Instead, time and value are combined to a single value (time delta << 1) + (value ? 1 : 0) and then
1181 // that value is encoded as a delta.
1182 template <>
1183 struct FieldValueBlockEncoding<TimeAndValue<bool>> {
1184 FieldValueBlockEncoding() : prev(), prev_combined(0) {}
1185 inline void write(BinaryWriter &w, TimeAndValue<bool> const &v) {
1186 int64_t combined = (v.time << 1) | (v.value ? 1 : 0);
1187 w << CompressedInt<int64_t>(combined - prev_combined);
1188 prev = v;
1189 prev_combined = combined;
1190 }
1191 TimeAndValue<bool> read(BinaryReader &r) {
1192 CompressedInt<int64_t> d;
1193 r >> d;
1194 prev_combined += d.value;
1195 prev.value = prev_combined & 1;
1196 prev.time = prev_combined << 1;
1197 return prev;
1198 }
1199 TimeAndValue<bool> prev;
1200 int64_t prev_combined;
1201 };
1202
1203 template <typename T>
1204 struct ContinuousMetric: NonCopyable, ReferenceCounted<ContinuousMetric<T>>, MetricUtil<ContinuousMetric<T>, T>, BaseMetric {
1205 // Needed for MetricUtil
1206 static const StringRef metricType;
1207
1208 private:
1209 EventField<TimeAndValue<T>> field;
1210 TimeAndValue<T> tv;
1211 bool recorded;
1212
1213 public:
1214 ContinuousMetric(MetricNameRef const &name, T const &initial)
1215 : BaseMetric(name), recorded(false) {
1216 tv.value = initial;
1217 }
1218
1219 virtual void addref() { ReferenceCounted<ContinuousMetric<T>>::addref(); }
1220 virtual void delref() { ReferenceCounted<ContinuousMetric<T>>::delref(); }
1221
1222 T getValue() const {
1223 return tv.value;
1224 }
1225
1226 void flushData(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
1227 if( !recorded ) {
1228 batch.updates.push_back(std::make_pair(mk.packLatestKey(), getLatestAsValue()));
1229 recorded = true;
1230 }
1231
1232 field.flushField(mk, rollTime, batch);
1233 }
1234
1235 void rollMetric(uint64_t t) {
1236 field.rollMetric(t);
1237 }
1238
1239 Standalone<StringRef> getLatestAsValue() {
1240 FieldValueBlockEncoding< TimeAndValue< T > > enc;
1241 BinaryWriter wr(AssumeVersion(currentProtocolVersion));
1242 // Write a header so the client can treat this value like a normal data value block.
1243 // TOOD: If it is useful, this could be the current header value of the most recently logged level.
1244 wr << FieldHeader<TimeAndValue<T>>();
1245 enc.write(wr, tv);
1246 return wr.toValue();
1247 }
1248
1249 void onEnable() {
1250 field.init();
1251 change();
1252 }
1253
1254 void onDisable() {
1255 change();
1256 }
1257
1258 void set(const T &v) {
1259 if(v != tv.value) {
1260 if(enabled)
1261 change();
1262 tv.value = v;
1263 }
1264 }
1265
1266 // requires += on T
1267 void add(const T &delta) {
1268 if(delta != T()) {
1269 if(enabled)
1270 change();
1271 tv.value += delta;
1272 }
1273 }
1274
1275 // requires ! on T
1276 void toggle() {
1277 if(enabled)
1278 change();
1279 tv.value = !tv.value;
1280 }
1281
1282 void change() {
1283 uint64_t toggleTime = timer_int();
1284 int64_t bytes = 0;
1285
1286 if(tv.time != 0) {
1287 double x = g_random->random01();
1288
1289 int64_t l = 0;
1290 if (x == 0.0)
1291 l = FLOW_KNOBS->MAX_METRIC_LEVEL-1;
1292 else if (toggleTime != tv.time)
1293 l = std::min(
1294 FLOW_KNOBS->MAX_METRIC_LEVEL-1,
1295 (int64_t)(
1296 log((toggleTime - tv.time) / x) /
1297 FLOW_KNOBS->METRIC_LEVEL_DIVISOR
1298 )
1299 );
1300
1301 if(!canLog(l))
1302 return;
1303
1304 bool overflow = false;
1305 field.log(tv, tv.time, l, overflow, bytes);
1306 if(overflow)
1307 field.nextKey(toggleTime, l);
1308 }
1309 tv.time = toggleTime;
1310 recorded = false;
1311 TDMetricCollection::getTDMetrics()->checkRoll(tv.time, bytes);
1312 }
1313 };
1314
1315 typedef ContinuousMetric<int64_t> Int64Metric;
1316 typedef Int64Metric VersionMetric;
1317 typedef ContinuousMetric<bool> BoolMetric;
1318 typedef ContinuousMetric<Standalone<StringRef>> StringMetric;
1319
1320 // MetricHandle / EventMetricHandle are wrappers for a Reference<MetricType> which provides
1321 // the following interface conveniences
1322 //
1323 // * The underlying metric reference is always initialized to a valid object. That valid object
1324 // may not actually be in a metric collection and therefore may not actually be able to write
1325 // data to a database, but it will work in other ways (i.e. int metrics will act like integers).
1326 //
1327 // * Operator =, ++, --, +=, and -= can be used as though the handle is an object of the MetricType::ValueType of
1328 // the metric type for which it is a handle.
1329 //
1330 // * Operator -> is defined such that the MetricHandle acts like a pointer to the underlying MetricType
1331 //
1332 // * Cast operator to MetricType::ValueType is defined so that the handle will act like a MetricType::ValueType
1333 //
1334 // * The last three features allow, for example, a MetricHandle<Int64Metric> to be a drop-in replacement for an int64_t.
1335 //
1336 template <typename T>
1337 struct MetricHandle {
1338 template<typename ValueType = typename T::ValueType>
1339 MetricHandle(StringRef const &name = StringRef(), StringRef const &id = StringRef(), ValueType const &initial = ValueType())
1340 : ref(T::getOrCreateInstance(name, id, true, initial)) {
1341 }
1342
1343 // Initialize this handle to point to a new or existing metric with (name, id). If a new metric is created then the handle's
1344 // current metric's current value will be the new metric's initial value. This allows Metric handle users to treate their
1345 // Metric variables as normal variables and then bind them to actual logging metrics later while continuing with the current value.
1346 void init(StringRef const &name, StringRef const &id = StringRef()) {
1347 ref = T::getOrCreateInstance(name, id, true, ref->getValue());
1348 }
1349
1350 void init(StringRef const &name, StringRef const &id, typename T::ValueType const &initial) {
1351 ref = T::getOrCreateInstance(name, id, true, initial);
1352 }
1353
1354 void operator=(typename T::ValueType const &v) {
1355 ref->set(v);
1356 }
1357 void operator++() {
1358 ref->add(1);
1359 }
1360 void operator++(int) {
1361 ref->add(1);
1362 }
1363 void operator--() {
1364 ref->add(-1);
1365 }
1366 void operator--(int) {
1367 ref->add(-1);
1368 }
1369 void operator+=(typename T::ValueType const &v) {
1370 ref->add(v);
1371 }
1372 void operator-=(typename T::ValueType const &v) {
1373 ref->add(-v);
1374 }
1375
1376 T * operator-> () { return ref.getPtr(); }
1377
1378 operator typename T::ValueType () const { return ref->getValue(); }
1379 typename T::ValueType getValue() const { return ref->getValue(); }
1380
1381 Reference<T> ref;
1382 };
1383
1384 template<class T>
1385 struct Traceable<MetricHandle<T>> : Traceable<typename T::ValueType> {
1386 static std::string toString(const MetricHandle<T>& value) {
1387 return Traceable<typename T::ValueType>::toString(value.getValue());
1388 }
1389 };
1390
1391 template<class T>
1392 struct SpecialTraceMetricType<MetricHandle<T>> : SpecialTraceMetricType<typename T::ValueType> {
1393 using parent = SpecialTraceMetricType<typename T::ValueType>;
1394 static auto getValue(const MetricHandle<T>& value) -> decltype(parent::getValue(value.getValue())) {
1395 return parent::getValue(value.getValue());
1396 }
1397 };
1398
1399 typedef MetricHandle<Int64Metric> Int64MetricHandle;
1400 typedef MetricHandle<VersionMetric> VersionMetricHandle;
1401 typedef MetricHandle<BoolMetric> BoolMetricHandle;
1402 typedef MetricHandle<StringMetric> StringMetricHandle;
1403
1404 template <typename E>
1405 using EventMetricHandle = MetricHandle<EventMetric<E>>;
1406
1407 #include "flow/unactorcompiler.h"
1408
1409 #endif
1410