1 /*
2  * TDMetric.actor.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #pragma once
22 
23 // When actually compiled (NO_INTELLISENSE), include the generated version of this file.  In intellisense use the source version.
24 #if defined(NO_INTELLISENSE) && !defined(FLOW_TDMETRIC_ACTOR_G_H)
25         #define FLOW_TDMETRIC_ACTOR_G_H
26         #include "flow/TDMetric.actor.g.h"
27 #elif !defined(FLOW_TDMETRIC_ACTOR_H)
28         #define FLOW_TDMETRIC_ACTOR_H
29 
30 #include "flow/flow.h"
31 #include "flow/IndexedSet.h"
32 #include "flow/network.h"
33 #include "flow/Knobs.h"
34 #include "flow/genericactors.actor.h"
35 #include "flow/CompressedInt.h"
36 #include <algorithm>
37 #include <functional>
38 #include <cmath>
39 #include "flow/actorcompiler.h"  // This must be the last #include.
40 
41 struct MetricNameRef {
MetricNameRefMetricNameRef42 	MetricNameRef() {}
MetricNameRefMetricNameRef43 	MetricNameRef(const StringRef& type, const StringRef& name, const StringRef &id)
44 	  : type(type), name(name), id(id) {
45 	}
MetricNameRefMetricNameRef46 	MetricNameRef(Arena& a, const MetricNameRef& copyFrom)
47 	  : type(a, copyFrom.type), name(a, copyFrom.name), id(a, copyFrom.id) {
48 	}
49 
50 	StringRef type, name, id;
51 
toStringMetricNameRef52 	std::string toString() const {
53 		return format("(%s,%s,%s,%s)", type.toString().c_str(), name.toString().c_str(), id.toString().c_str());
54 	}
55 
expectedSizeMetricNameRef56 	int expectedSize() const {
57 		return type.expectedSize() + name.expectedSize();
58 	}
59 };
60 
61 extern std::string reduceFilename(std::string const &filename);
62 inline bool operator < (const MetricNameRef& l, const MetricNameRef& r ) {
63 	int cmp = l.type.compare(r.type);
64 	if(cmp == 0) {
65 		cmp = l.name.compare(r.name);
66 		if(cmp == 0)
67 			cmp = l.id.compare(r.id);
68 	}
69 	return cmp < 0;
70 }
71 
72 inline bool operator == (const MetricNameRef& l, const MetricNameRef& r ) {
73 	return l.type == r.type && l.name == r.name && l.id == r.id;
74 }
75 
76 inline bool operator != (const MetricNameRef& l, const MetricNameRef& r ) {
77 	return !(l == r);
78 }
79 
80 struct KeyWithWriter {
81 	Standalone<StringRef> key;
82 	BinaryWriter writer;
83 	int writerOffset;
84 
keyKeyWithWriter85 	KeyWithWriter( Standalone<StringRef> const& key, BinaryWriter& writer, int writerOffset = 0) : key(key), writer(std::move(writer)), writerOffset(writerOffset) {}
KeyWithWriterKeyWithWriter86 	KeyWithWriter( KeyWithWriter&& r ) : key(std::move(r.key)), writer(std::move(r.writer)), writerOffset(r.writerOffset) {}
87 	void operator=( KeyWithWriter&& r ) { key = std::move(r.key); writer = std::move(r.writer); writerOffset = r.writerOffset; }
88 
valueKeyWithWriter89 	StringRef value() {
90 		return StringRef(writer.toValue().substr(writerOffset));
91 	}
92 };
93 
94 // This is a very minimal interface for getting metric data from the DB which is needed
95 // to support continuing existing metric data series.
96 // It's lack of generality is intentional.
97 class IMetricDB {
98 public:
~IMetricDB()99 	virtual ~IMetricDB() {}
100 
101 	// key should be the result of calling metricKey or metricFieldKey with time = 0
102 	virtual Future<Optional<Standalone<StringRef>>> getLastBlock(Standalone<StringRef> key) = 0;
103 };
104 
105 // Key generator for metric keys for various things.
106 struct MetricKeyRef {
MetricKeyRefMetricKeyRef107 	MetricKeyRef() : level(-1) {}
MetricKeyRefMetricKeyRef108 	MetricKeyRef(Arena& a, const MetricKeyRef& copyFrom)
109 	  : prefix(a, copyFrom.prefix), name(a, copyFrom.name), address(a, copyFrom.address),
110 	    fieldName(a, copyFrom.fieldName), fieldType(a, copyFrom.fieldType), level(copyFrom.level) {
111 	}
112 
113 	StringRef prefix;
114 	MetricNameRef name;
115 	StringRef address;
116 	StringRef fieldName;
117 	StringRef fieldType;
118 	uint64_t level;
119 
expectedSizeMetricKeyRef120 	int expectedSize() const {
121 		return prefix.expectedSize() + name.expectedSize() + address.expectedSize() + fieldName.expectedSize() + fieldType.expectedSize();
122 	}
123 
withFieldMetricKeyRef124 	template <typename T> inline MetricKeyRef withField(const T &field) const {
125 		MetricKeyRef mk(*this);
126 		mk.fieldName = field.name();
127 		mk.fieldType = field.typeName();
128 		return mk;
129 	}
130 
131 	const Standalone<StringRef> packLatestKey() const;
132 	const Standalone<StringRef> packDataKey(int64_t time = -1) const;
133 	const Standalone<StringRef> packFieldRegKey() const;
134 
isFieldMetricKeyRef135 	bool isField() const { return fieldName.size() > 0 && fieldType.size() > 0; }
136 	void writeField(BinaryWriter &wr) const;
137 	void writeMetricName(BinaryWriter &wr) const;
138 };
139 
140 struct MetricUpdateBatch {
141 	std::vector<KeyWithWriter> inserts;
142 	std::vector<KeyWithWriter> appends;
143 	std::vector<std::pair<Standalone<StringRef>,Standalone<StringRef>>> updates;
144 	std::vector<std::function<Future<Void>(IMetricDB *, MetricUpdateBatch *)>> callbacks;
145 
clearMetricUpdateBatch146 	void clear() {
147 		inserts.clear();
148 		appends.clear();
149 		updates.clear();
150 		callbacks.clear();
151 	}
152 };
153 
154 template<typename T>
metricTypeName()155 inline const StringRef metricTypeName() {
156 	// If this function does not compile then T is not a supported metric type
157 	return T::metric_field_type();
158 }
159 #define MAKE_TYPENAME(T, S) template<> inline const StringRef metricTypeName<T>() { return LiteralStringRef(S); }
160 MAKE_TYPENAME(bool, "Bool")
161 MAKE_TYPENAME(int64_t, "Int64")
162 MAKE_TYPENAME(double, "Double")
163 MAKE_TYPENAME(Standalone<StringRef>, "String")
164 #undef MAKE_TYPENAME
165 
166 struct BaseMetric;
167 
168 // The collection of metrics that exist for a single process, at a single address.
169 class TDMetricCollection {
170 public:
TDMetricCollection()171 	TDMetricCollection() : currentTimeBytes(0) {}
172 
173 	// Metric Name to reference to its instance
174 	Map<Standalone<MetricNameRef>, Reference<BaseMetric>, MapPair<Standalone<MetricNameRef>, Reference<BaseMetric>>, int> metricMap;
175 
176 	AsyncTrigger metricAdded;
177 	AsyncTrigger metricEnabled;
178 	AsyncTrigger metricRegistrationChanged;
179 
180 	// Initialize the collection.  Once this returns true, metric data can be written to a database.  Note that metric data can be logged
181 	// before that time, just not written to a database.
init()182 	bool init() {
183 		// Get and store the local address in the metric collection, but only if it is not 0.0.0.0:0
184 		if( address.size() == 0 ) {
185 			NetworkAddress addr = g_network->getLocalAddress();
186 			if (addr.ip.isValid() && addr.port != 0) address = StringRef(addr.toString());
187 		}
188 		return address.size() != 0;
189 	}
190 
191 	// Returns the TDMetrics that the calling process should use
getTDMetrics()192 	static TDMetricCollection* getTDMetrics() {
193 		if(g_network == nullptr)
194 			return nullptr;
195 		return static_cast<TDMetricCollection*>((void*) g_network->global(INetwork::enTDMetrics));
196 	}
197 
198 	Deque<uint64_t> rollTimes;
199 	int64_t currentTimeBytes;
200 	Standalone<StringRef> address;
201 
202 	void checkRoll(uint64_t t, int64_t usedBytes);
203 	bool canLog(int level);
204 };
205 
206 struct MetricData {
207 	uint64_t start;
208 	uint64_t rollTime;
209 	uint64_t appendStart;
210 	BinaryWriter writer;
211 
212 	explicit MetricData(uint64_t appendStart = 0) :
writerMetricData213 		writer(AssumeVersion(currentProtocolVersion)),
214 		start(0),
215 		rollTime(std::numeric_limits<uint64_t>::max()),
216 		appendStart(appendStart) {
217 	}
218 
MetricDataMetricData219 	MetricData( MetricData&& r ) BOOST_NOEXCEPT :
220 		start(r.start),
221 		rollTime(r.rollTime),
222 		appendStart(r.appendStart),
223 		writer(std::move(r.writer)) {
224 	}
225 
226 	void operator=( MetricData&& r ) BOOST_NOEXCEPT {
227 		start = r.start; rollTime = r.rollTime; appendStart = r.appendStart; writer = std::move(r.writer);
228 	}
229 
230 	std::string toString();
231 };
232 
233 // Some common methods to reduce code redundancy across different metric definitions
234 template<typename T, typename _ValueType = Void>
235 struct MetricUtil {
236 	typedef _ValueType ValueType;
237 	typedef T MetricType;
238 
239 	// Looks up a metric by name and id and returns a reference to it if it exists.
240 	// Empty names will not be looked up.
241 	// If create is true then a metric will be created with the given initial value if one could not be found to return.
242 	// If a metric is created and name is not empty then the metric will be placed in the collection.
243 	static Reference<T> getOrCreateInstance(StringRef const& name, StringRef const &id = StringRef(), bool create = false, ValueType initial = ValueType()) {
244 		Reference<T> m;
245 		TDMetricCollection *collection = TDMetricCollection::getTDMetrics();
246 
247 		// If there is a metric collect and this metric has a name then look it up in the collection
248 		bool useMap = collection != nullptr && name.size() > 0;
249 		MetricNameRef mname;
250 
251 		if(useMap) {
252 			mname = MetricNameRef(T::metricType, name, id);
253 			auto mi = collection->metricMap.find(mname);
254 			if(mi != collection->metricMap.end()) {
255 				m = mi->value.castTo<T>();
256 			}
257 		}
258 
259 		// If we don't have a valid metric reference yet and the create flag was set then create one and possibly put it in the map
260 		if(!m && create) {
261 			// Metric not found in collection but create is set then create it in the map
262 			m = Reference<T>(new T(mname, initial));
263 			if(useMap) {
264 				collection->metricMap[mname] = m.template castTo<BaseMetric>();
265 				collection->metricAdded.trigger();
266 			}
267 		}
268 
269 		return m;
270 	}
271 
272 	// Lookup the T metric by name and return its value (or nullptr if it doesn't exist)
lookupMetricMetricUtil273 	static T * lookupMetric(MetricNameRef const &name) {
274 		auto it = T::metricMap().find(name);
275 		if(it != T::metricMap().end())
276 			return it->value;
277 		return nullptr;
278 	}
279 };
280 
281 // index_sequence implementation since VS2013 doesn't have it yet
282 template <size_t... Ints> class index_sequence {
283 public:
size()284 	static size_t size() { return sizeof...(Ints); }
285 };
286 
287 template <size_t Start, typename Indices, size_t End>
288 struct make_index_sequence_impl;
289 
290 template <size_t Start, size_t... Indices, size_t End>
291 struct make_index_sequence_impl<Start, index_sequence<Indices...>, End> {
292 	typedef typename make_index_sequence_impl<
293 		Start + 1, index_sequence<Indices..., Start>, End>::type type;
294 };
295 
296 template <size_t End, size_t... Indices>
297 struct make_index_sequence_impl<End, index_sequence<Indices...>, End> {
298 	typedef index_sequence<Indices...> type;
299 };
300 
301 // The code that actually implements tuple_map
302 template <size_t I, typename F, typename... Tuples>
303 auto tuple_zip_invoke(F f, const Tuples &... ts) -> decltype( f(std::get<I>(ts)...) ) {
304 	return f(std::get<I>(ts)...);
305 }
306 
307 template <typename F, size_t... Is, typename... Tuples>
308 auto tuple_map_impl(F f, index_sequence<Is...>, const Tuples &... ts) -> decltype( std::make_tuple(tuple_zip_invoke<Is>(f, ts...)...) ) {
309 	return std::make_tuple(tuple_zip_invoke<Is>(f, ts...)...);
310 }
311 
312 // tuple_map( f(a,b), (a1,a2,a3), (b1,b2,b3) ) = (f(a1,b1), f(a2,b2), f(a3,b3))
313 template <typename F, typename Tuple, typename... Tuples>
314 auto tuple_map(F f, const Tuple &t, const Tuples &... ts) -> decltype( tuple_map_impl(f, typename make_index_sequence_impl<0, index_sequence<>, std::tuple_size<Tuple>::value>::type(), t, ts...) ) {
315 	return tuple_map_impl(f, typename make_index_sequence_impl<0, index_sequence<>, std::tuple_size<Tuple>::value>::type(), t, ts...);
316 }
317 
318 template <class T>
319 struct Descriptor {
320 #ifndef NO_INTELLISENSE
321 	using fields = std::tuple<>;
322 	typedef make_index_sequence_impl<0, index_sequence<>, std::tuple_size<fields>::value>::type field_indexes;
323 
324 	static StringRef typeName() {{ return LiteralStringRef(""); }}
325 #endif
326 };
327 
328 // FieldHeader is a serializable (FIXED SIZE!) and updatable Header type for Metric field levels.
329 // Update is via += with either a T or another FieldHeader
330 // Default implementation is sufficient for ints and doubles
331 template<typename T>
332 struct FieldHeader {
333 	FieldHeader() : version(1), count(0), sum(0) {}
334 	uint8_t version;
335 	int64_t count;
336 	// sum is a T if T is arithmetic, otherwise it's an int64_t
337 	typename std::conditional<std::is_floating_point<T>::value, double, int64_t>::type sum;
338 
339 	void update(FieldHeader const &h) {
340 		count += h.count;
341 		sum += h.sum;
342 	}
343 	void update(T const &v) {
344 		++count;
345 		sum += v;
346 	}
347 	template<class Ar> void serialize(Ar &ar) {
348 		serializer(ar, version);
349 		ASSERT(version == 1);
350 		serializer(ar, count, sum);
351 	}
352 };
353 
354 template <> inline void FieldHeader<Standalone<StringRef>>::update(Standalone<StringRef> const &v) {
355 	++count;
356 	sum += v.size();
357 }
358 
359 // FieldValueBlockEncoding is a class for reading and writing encoded field values to and from field
360 // value data blocks.  Note that an implementation can be stateful.
361 // Proper usage requires that a single Encoding instance is used to either write all field values to a metric
362 // data block or to read all field values from a metric value block.  This usage pattern enables enables
363 // encoding and decoding values as deltas from previous values.
364 //
365 // The default implementation works for ints and writes delta from the previous value.
366 template <typename T>
367 struct FieldValueBlockEncoding {
368 	FieldValueBlockEncoding() : prev(0) {}
369 	inline void write(BinaryWriter &w, T v) {
370 		w << CompressedInt<T>(v - prev);
371 		prev = v;
372 	}
373 	T read(BinaryReader &r) {
374 		CompressedInt<T> v;
375 		r >> v;
376 		prev += v.value;
377 		return prev;
378 	}
379 	T prev;
380 };
381 
382 template <>
383 struct FieldValueBlockEncoding<double> {
384 	inline void write(BinaryWriter &w, double v) {
385 		w << v;
386 	}
387 	double read(BinaryReader &r) {
388 		double v;
389 		r >> v;
390 		return v;
391 	}
392 };
393 
394 template <>
395 struct FieldValueBlockEncoding<bool> {
396 	inline void write(BinaryWriter &w, bool v) {
397 		w.serializeBytes( v ? LiteralStringRef("\x01") : LiteralStringRef("\x00") );
398 	}
399 	bool read(BinaryReader &r) {
400 		uint8_t *v = (uint8_t *)r.readBytes(sizeof(uint8_t));
401 		return *v != 0;
402 	}
403 };
404 
405 // Encoder for strings, writes deltas
406 template <>
407 struct FieldValueBlockEncoding<Standalone<StringRef>> {
408 	inline void write(BinaryWriter &w, Standalone<StringRef> const &v) {
409 		int reuse = 0;
410 		int stop = std::min(v.size(), prev.size());
411 		while(reuse < stop && v[reuse] == prev[reuse])
412 			++reuse;
413 		w << CompressedInt<int>(reuse) << CompressedInt<int>(v.size() - reuse);
414 		if(v.size() > reuse)
415 			w.serializeBytes(v.substr(reuse));
416 		prev = v;
417 	}
418 	Standalone<StringRef> read(BinaryReader &r) {
419 		CompressedInt<int> reuse;
420 		CompressedInt<int> extra;
421 		r >> reuse >> extra;
422 		ASSERT(reuse.value >= 0 && extra.value >= 0 && reuse.value <= prev.size());
423 		Standalone<StringRef> v = makeString(reuse.value + extra.value);
424 		memcpy(mutateString(v),               prev.begin(),             reuse.value);
425 		memcpy(mutateString(v) + reuse.value, r.readBytes(extra.value), extra.value);
426 		prev = v;
427 		return v;
428 	}
429 	// Using a Standalone<StringRef> for prev is efficient for writing but not great for reading.
430 	Standalone<StringRef> prev;
431 };
432 
433 
434 // Field level for value type of T using header type of Header.  Default header type is the default FieldHeader implementation for type T.
435 template <class T, class Header = FieldHeader<T>, class Encoder = FieldValueBlockEncoding<T>>
436 struct FieldLevel {
437 
438 	Deque<MetricData> metrics;
439 	int64_t appendUsed;
440 	Header header;
441 
442 	// The previous header and the last timestamp at which an out going MetricData block requires header patching
443 	Optional<Header> previousHeader;
444 	uint64_t lastTimeRequiringHeaderPatch;
445 
446 	Encoder enc;
447 
448 	explicit FieldLevel() : appendUsed(0) {
449 		metrics.emplace_back(MetricData());
450 		metrics.back().writer << header;
451 	}
452 
453 	FieldLevel(FieldLevel &&f)
454 	  : metrics(std::move(f.metrics)), appendUsed(f.appendUsed), enc(f.enc), header(f.header),
455 	    previousHeader(f.previousHeader), lastTimeRequiringHeaderPatch(f.lastTimeRequiringHeaderPatch) {
456 	}
457 
458 	// update Header, use Encoder to write T v
459 	void log( T v, uint64_t t, bool& overflow, int64_t& bytes ) {
460 		int lastLength = metrics.back().writer.getLength();
461 		if( metrics.back().start == 0 )
462 			metrics.back().start = t;
463 
464 		header.update(v);
465 		enc.write(metrics.back().writer, v);
466 
467 		bytes += metrics.back().writer.getLength() - lastLength;
468 		if(lastLength + appendUsed > FLOW_KNOBS->MAX_METRIC_SIZE)
469 			overflow = true;
470 	}
471 
472 	void nextKey( uint64_t t ) {
473 		// If nothing has actually been written to the current block, don't add a new block,
474 		// just modify this one if needed so that the next log call will set the ts for this block.
475 		auto &m = metrics.back();
476 		if(m.start == 0 && m.appendStart == 0)
477 			return;
478 
479 		// This block would have appended but had no data so just reset it to a non-append block instead of adding a new one
480 		if(m.appendStart != 0 && m.writer.getLength() == 0) {
481 			m.appendStart = 0;
482 			m.writer << header;
483 			enc = Encoder();
484 			return;
485 		}
486 
487 		metrics.back().rollTime = t;
488 		metrics.emplace_back(MetricData());
489 		metrics.back().writer << header;
490 		enc = Encoder();
491 		appendUsed = 0;
492 	}
493 
494 	void rollMetric( uint64_t t ) {
495 		ASSERT(metrics.size());
496 
497 		if(metrics.back().start) {
498 			metrics.back().rollTime = t;
499 			appendUsed += metrics.back().writer.getLength();
500 			if(metrics.back().appendStart)
501 				metrics.emplace_back(MetricData(metrics.back().appendStart));
502 			else
503 				metrics.emplace_back(MetricData(metrics.back().start));
504 		}
505 	}
506 
507 	// Calculate header as of the end of a value block
508 	static Header calculateHeader(StringRef block) {
509 		BinaryReader r(block, AssumeVersion(currentProtocolVersion));
510 		Header h;
511 		r >> h;
512 		Encoder dec;
513 		while(!r.empty()) {
514 			T v = dec.read(r);
515 			h.update(v);
516 		}
517 		return h;
518 	}
519 
520 	// Read header at position, update it with previousHeader, overwrite old header with new header.
521 	static void updateSerializedHeader(StringRef buf, const Header &patch) {
522 		BinaryReader r(buf, AssumeVersion(currentProtocolVersion));
523 		Header h;
524 		r >> h;
525 		h.update(patch);
526 		OverWriter w(mutateString(buf), buf.size(), AssumeVersion(currentProtocolVersion));
527 		w << h;
528 	}
529 
530 	// Flushes data blocks in metrics to batch, optionally patching headers if a header is given
531 	void flushUpdates(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
532 		while(metrics.size()) {
533 			auto& data = metrics.front();
534 
535 			if(data.start != 0 && data.rollTime <= rollTime) {
536 				// If this data is to be appended, write it to the batch now.
537 				if( data.appendStart ) {
538 					batch.appends.push_back(KeyWithWriter(mk.packDataKey(data.appendStart), data.writer));
539 				} else {
540 					// Otherwise, insert but first, patch the header if this block is old enough
541 					if(data.rollTime <= lastTimeRequiringHeaderPatch) {
542 						ASSERT(previousHeader.present());
543 						FieldLevel<T>::updateSerializedHeader(data.writer.toValue(), previousHeader.get());
544 					}
545 
546 					batch.inserts.push_back(KeyWithWriter(mk.packDataKey(data.start), data.writer));
547 				}
548 
549 				if(metrics.size() == 1) {
550 					rollMetric(data.rollTime);
551 					metrics.pop_front();
552 					break;
553 				}
554 
555 				metrics.pop_front();
556 			}
557 			else
558 				break;
559 		}
560 	}
561 
562 	ACTOR static Future<Void> updatePreviousHeader(FieldLevel *self, IMetricDB *db, Standalone<MetricKeyRef> mk, uint64_t rollTime, MetricUpdateBatch *batch) {
563 
564 		Optional<Standalone<StringRef>> block = wait(db->getLastBlock(mk.packDataKey(-1)));
565 
566 		// If the block is present, use it
567 		if(block.present()) {
568 			// Calculate the previous data's final header value
569 			Header oldHeader = calculateHeader(block.get());
570 
571 			// Set the previous header in self to this header for us in patching outgoing blocks
572 			self->previousHeader = oldHeader;
573 
574 			// Update the header in self so the next new block created will be current
575 			self->header.update(oldHeader);
576 
577 			// Any blocks already in the metrics queue will need to be patched at the time that they are
578 			// flushed to the DB (which isn't necessarity part of the current flush) so set the last time
579 			// that requires a patch to the time of the last MetricData in the queue
580 			self->lastTimeRequiringHeaderPatch = self->metrics.back().rollTime;
581 		}
582 		else {
583 			// Otherwise, there is no previous header so no headers need to be updated at all ever.
584 			// Set the previous header to an empty header so that flush() sees that this process
585 			// has already finished, and set lastTimeRequiringHeaderPatch to 0 since no blocks ever need to be patched.
586 			self->previousHeader = Header();
587 			self->lastTimeRequiringHeaderPatch = 0;
588 		}
589 
590 		// Now flush the level data up to the rollTime argument and patch anything older than lastTimeRequiringHeaderPatch
591 		self->flushUpdates(mk, rollTime, *batch);
592 
593 		return Void();
594 	}
595 
596 	// Flush this level's data to the output batch.
597 	// This function must NOT be called again until any callbacks added to batch have been completed.
598 	void flush(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
599 		// Don't do anything if there is no data in the queue to flush.
600 		if(metrics.empty() || metrics.front().start == 0)
601 			return;
602 
603 		// If the previous header is present then just call flushUpdates now.
604 		if(previousHeader.present())
605 			return flushUpdates(mk, rollTime, batch);
606 
607 		Standalone<MetricKeyRef> mkCopy = mk;
608 
609 		// Previous header is not present so queue a callback which will update it
610 		batch.callbacks.push_back([=](IMetricDB *db, MetricUpdateBatch *batch) mutable -> Future<Void> {
611 			return updatePreviousHeader(this, db, mkCopy, rollTime, batch);
612 		});
613 
614 	}
615 };
616 
617 // A field Description to be used for continuous metrics, whose field name and type should never be accessed
618 struct NullDescriptor {
619 	static StringRef name() { return StringRef(); }
620 };
621 
622 // Descriptor must have the methods name() and typeName().  They can be either static or member functions (such as for runtime configurability).
623 // Descriptor is inherited so that syntatically Descriptor::fn() works in either case and so that an empty Descriptor with static methods
624 // will take up 0 space.  EventField() accepts an optional Descriptor instance.
625 template <class T, class Descriptor = NullDescriptor, class FieldLevelType = FieldLevel<T>>
626 struct EventField : public Descriptor {
627 	std::vector<FieldLevelType> levels;
628 
629 	EventField( EventField&& r ) BOOST_NOEXCEPT : Descriptor(r), levels(std::move(r.levels)) {}
630 
631 	void operator=( EventField&& r ) BOOST_NOEXCEPT {
632 		levels = std::move(r.levels);
633 	}
634 
635 	EventField(Descriptor d = Descriptor()) : Descriptor(d) {
636 	}
637 
638 	static StringRef typeName() { return metricTypeName<T>(); }
639 
640 	void init() {
641 		if(levels.size() != FLOW_KNOBS->MAX_METRIC_LEVEL) {
642 			levels.clear();
643 			levels.resize(FLOW_KNOBS->MAX_METRIC_LEVEL);
644 		}
645 	}
646 
647 	void log( T v, uint64_t t, int64_t l, bool& overflow, int64_t& bytes ) {
648 		return levels[l].log(v, t, overflow, bytes);
649 	}
650 
651 	void nextKey( uint64_t t, int level ) {
652 		levels[level].nextKey(t);
653 	}
654 
655 	void nextKeyAllLevels( uint64_t t ) {
656 		for(int64_t i = 0; i < FLOW_KNOBS->MAX_METRIC_LEVEL; i++)
657 			nextKey(t, i);
658 	}
659 
660 	void rollMetric( uint64_t t ) {
661 		for(int i = 0; i < levels.size(); i++) {
662 			levels[i].rollMetric(t);
663 		}
664 	}
665 
666 	void flushField(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
667 		MetricKeyRef fk = mk.withField(*this);
668 		for(int j = 0; j < levels.size(); ++j) {
669 			fk.level = j;
670 			levels[j].flush(fk, rollTime, batch);
671 		}
672 	}
673 
674 	// Writes and Event metric field registration key
675 	void registerField( const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
676 		fieldKeys.push_back(mk.withField(*this).packFieldRegKey());
677 	}
678 };
679 
680 struct MakeEventField {
681 	template <class Descriptor>
682 	EventField<typename Descriptor::type, Descriptor> operator() (Descriptor) { return EventField<typename Descriptor::type, Descriptor>(); }
683 };
684 
685 struct TimeDescriptor {
686 	static StringRef name() { return LiteralStringRef("Time"); }
687 };
688 
689 struct BaseMetric {
690 	BaseMetric(MetricNameRef const &name) : metricName(name), pCollection(nullptr), registered(false), enabled(false) {
691 		setConfig(false);
692 	}
693 	virtual ~BaseMetric() {
694 	}
695 
696 	virtual void addref() = 0;
697 	virtual void delref() = 0;
698 
699 	virtual void rollMetric(uint64_t t) = 0;
700 
701 	virtual void flushData(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) = 0;
702 	virtual void registerFields(const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys) {};
703 
704 	// Set the metric's config.  An assert will fail if the metric is enabled before the metrics collection is available.
705 	void setConfig(bool enable, int minLogLevel = 0) {
706 		bool wasEnabled = enabled;
707 		enabled = enable;
708 		minLevel = minLogLevel;
709 
710 		if(enable && pCollection == nullptr) {
711 			pCollection = TDMetricCollection::getTDMetrics();
712 			ASSERT(pCollection != nullptr);
713 		}
714 
715 		if(wasEnabled != enable) {
716 			if(enabled) {
717 				onEnable();
718 				pCollection->metricEnabled.trigger();
719 			}
720 			else
721 				onDisable();
722 		}
723 	}
724 
725 	// Callbacks for when metric is Enabled or Disabled.
726 	// Metrics should verify their underlying storage on Enable because they could have been initially created
727 	// at a time when the knobs were not initialized.
728 	virtual void onEnable() = 0;
729 	virtual void onDisable() {};
730 
731 	// Combines checking this metric's configured minimum level and any collection-wide throttling
732 	// This should only be called after it is determined that a metric is enabled.
733 	bool canLog(int level) {
734 		return level >= minLevel && pCollection->canLog(level);
735 	}
736 
737 	Standalone<MetricNameRef> metricName;
738 
739 	bool enabled;  // The metric is currently logging data
740 	int minLevel;  // The minimum level that will be logged.
741 
742 	// All metrics need a pointer to their collection for performance reasons - every time a data point is logged
743 	// canLog must be called which uses the collection's canLog to decide based on the metric write queue.
744 	TDMetricCollection *pCollection;
745 
746 	// The metric has been registered in its current form (some metrics can change and require re-reg)
747 	bool registered;
748 };
749 
750 struct BaseEventMetric : BaseMetric {
751 
752 	BaseEventMetric(MetricNameRef const &name) : BaseMetric(name) {
753 	}
754 
755 	// Needed for MetricUtil
756 	static const StringRef metricType;
757 	Void getValue() const {
758 		return Void();
759 	}
760 	virtual ~BaseEventMetric() {}
761 
762 	// Every metric should have a set method for its underlying type in order for MetricUtil::getOrCreateInstance
763 	// to initialize it.  In the case of event metrics there is no underlying type so the underlying type
764 	// is Void and set does nothing.
765 	void set(Void const &val) {}
766 
767 	virtual StringRef getTypeName() = 0;
768 };
769 
770 template <class E>
771 struct EventMetric : E, ReferenceCounted<EventMetric<E>>, MetricUtil<EventMetric<E>>, BaseEventMetric {
772 	EventField<int64_t, TimeDescriptor> time;
773 	bool latestRecorded;
774 	decltype( tuple_map( MakeEventField(), typename Descriptor<E>::fields() ) ) values;
775 
776 	virtual void addref() { ReferenceCounted<EventMetric<E>>::addref(); }
777 	virtual void delref() { ReferenceCounted<EventMetric<E>>::delref(); }
778 
779 	EventMetric( MetricNameRef const &name, Void) : BaseEventMetric(name), latestRecorded(false) {
780 	}
781 
782 	virtual ~EventMetric() {
783 	}
784 
785 	virtual StringRef getTypeName() { return Descriptor<E>::typeName(); }
786 
787 	void onEnable() {
788 		// Must initialize fields, previously knobs may not have been set.
789 		time.init();
790 		initFields( typename Descriptor<E>::field_indexes());
791 	}
792 
793 	// Log the event.
794 	// Returns the time that was logged for the event so that it can be passed to other events that need to be time-sync'd.
795 	// NOTE:  Do NOT use the same time for two consecutive loggings of the SAME event.  This *could* cause there to be metric data
796 	// blocks such that the last timestamp of one block is equal to the first timestamp of the next, which means if a search is done
797 	// for the exact timestamp then the first event will not be found.
798 	uint64_t log(uint64_t explicitTime = 0) {
799 		if(!enabled)
800 			return 0;
801 
802 		uint64_t t = explicitTime ? explicitTime : timer_int();
803 		double x = g_random->random01();
804 
805 		int64_t l = 0;
806 		if (x == 0.0)
807 			l = FLOW_KNOBS->MAX_METRIC_LEVEL-1;
808 		else
809 			l = std::min(FLOW_KNOBS->MAX_METRIC_LEVEL-1, (int64_t)(::log(1.0/x) / FLOW_KNOBS->METRIC_LEVEL_DIVISOR));
810 
811 		if(!canLog(l))
812 			return 0;
813 
814 		bool overflow = false;
815 		int64_t bytes = 0;
816 		time.log(t, t, l, overflow, bytes);
817 		logFields( typename Descriptor<E>::field_indexes(), t, l, overflow, bytes );
818 		if(overflow) {
819 			time.nextKey(t, l);
820 			nextKeys(typename Descriptor<E>::field_indexes(), t, l);
821 		}
822 		latestRecorded = false;
823 		return t;
824 	}
825 
826 	template <size_t... Is>
827 	void logFields(index_sequence<Is...>, uint64_t t, int64_t l, bool& overflow, int64_t& bytes) {
828 #ifdef NO_INTELLISENSE
829 		auto _ = {
830 			(std::get<Is>(values).log( std::tuple_element<Is, typename Descriptor<E>::fields>::type::get( static_cast<E&>(*this) ), t, l, overflow, bytes ), Void())...
831 		};
832 #endif
833 	}
834 
835 	template <size_t... Is>
836 	void initFields(index_sequence<Is...>) {
837 #ifdef NO_INTELLISENSE
838 		auto _ = {
839 			(std::get<Is>(values).init(), Void())...
840 		};
841 #endif
842 	}
843 
844 	template <size_t... Is>
845 	void nextKeys(index_sequence<Is...>, uint64_t t, int64_t l ) {
846 #ifdef NO_INTELLISENSE
847 		auto _ = {
848 			(std::get<Is>(values).nextKey(t, l),Void())...
849 		};
850 #endif
851 	}
852 
853 	virtual void flushData(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
854 		time.flushField( mk, rollTime, batch );
855 		flushFields( typename Descriptor<E>::field_indexes(), mk, rollTime, batch );
856 		if(!latestRecorded) {
857 			batch.updates.push_back(std::make_pair(mk.packLatestKey(), StringRef()));
858 			latestRecorded = true;
859 		}
860 	}
861 
862 	template <size_t... Is>
863 	void flushFields(index_sequence<Is...>, MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch ) {
864 #ifdef NO_INTELLISENSE
865 		auto _ = {
866 			(std::get<Is>(values).flushField( mk, rollTime, batch ),Void())...
867 		};
868 #endif
869 	}
870 
871 	virtual void rollMetric( uint64_t t ) {
872 		time.rollMetric(t);
873 		rollFields( typename Descriptor<E>::field_indexes(), t );
874 	}
875 
876 	template <size_t... Is>
877 	void rollFields(index_sequence<Is...>, uint64_t t ) {
878 #ifdef NO_INTELLISENSE
879 		auto _ = {
880 			(std::get<Is>(values).rollMetric( t ),Void())...
881 		};
882 #endif
883 	}
884 
885 	virtual void registerFields( MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
886 		time.registerField( mk, fieldKeys );
887 		registerFields( typename Descriptor<E>::field_indexes(), mk, fieldKeys );
888 	}
889 
890 	template <size_t... Is>
891 	void registerFields(index_sequence<Is...>, const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
892 #ifdef NO_INTELLISENSE
893 		auto _ = {
894 			(std::get<Is>(values).registerField( mk, fieldKeys ),Void())...
895 		};
896 #endif
897 	}
898 protected:
899     bool it;
900 };
901 
902 // A field Descriptor compatible with EventField but with name set at runtime
903 struct DynamicDescriptor {
904 	DynamicDescriptor(const char *name)
905 	  : _name(StringRef((uint8_t *)name, strlen(name))) {}
906 	StringRef name() const { return _name; }
907 
908 private:
909 	const Standalone<StringRef> _name;
910 };
911 
912 template<typename T>
913 struct DynamicField;
914 
915 struct DynamicFieldBase {
916 	virtual ~DynamicFieldBase() {}
917 
918 	virtual StringRef fieldName() = 0;
919 	virtual const StringRef getDerivedTypeName() = 0;
920 	virtual void init() = 0;
921 	virtual void clear() = 0;
922 	virtual void log(uint64_t t, int64_t l, bool& overflow, int64_t& bytes ) = 0;
923 	virtual void nextKey( uint64_t t, int level ) = 0;
924 	virtual void nextKeyAllLevels( uint64_t t) = 0;
925 	virtual void rollMetric( uint64_t t ) = 0;
926 	virtual void flushField( MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) = 0;
927 	virtual void registerField( MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys ) = 0;
928 
929 	// Set the current value of this field from the value of another
930 	virtual void setValueFrom(DynamicFieldBase *src, StringRef eventType) = 0;
931 
932 	// Create a new field of the same type and with the same current value as this one and with the given name
933 	virtual DynamicFieldBase * createNewWithValue(const char *name) = 0;
934 
935 	// This does a fairly cheap and "safe" downcast without using dynamic_cast / RTTI by checking that the pointer value
936 	// of the const char * type string is the same as getDerivedTypeName for this object.
937 	template<typename T> DynamicField<T> * safe_downcast(StringRef eventType) {
938 		if(getDerivedTypeName() == metricTypeName<T>())
939 			return (DynamicField<T> *)this;
940 
941 		TraceEvent(SevWarnAlways, "ScopeEventFieldTypeMismatch")
942 			.detail("EventType", eventType.toString())
943 			.detail("FieldName", fieldName().toString())
944 			.detail("OldType", getDerivedTypeName().toString())
945 			.detail("NewType", metricTypeName<T>().toString());
946 		return NULL;
947 	}
948 };
949 
950 template<typename T>
951 struct DynamicField : public DynamicFieldBase, EventField<T, DynamicDescriptor> {
952 	typedef EventField<T, DynamicDescriptor> EventFieldType;
953 	DynamicField(const char *name) : DynamicFieldBase(), EventFieldType(DynamicDescriptor(name)), value(T()) {}
954 	virtual ~DynamicField() {}
955 
956 	StringRef fieldName() { return EventFieldType::name(); }
957 
958 	// Get the field's datatype, this is used as a form of RTTI by DynamicFieldBase::safe_downcast()
959 	const StringRef getDerivedTypeName() { return metricTypeName<T>(); }
960 
961 	// Pure virtual implementations
962 	void clear() { value = T(); }
963 
964 	void log(uint64_t t, int64_t l, bool& overflow, int64_t& bytes) {
965 		return EventFieldType::log(value, t, l, overflow, bytes);
966 	}
967 
968 	// Redirects to EventFieldType methods
969 	void nextKey( uint64_t t, int level ) {
970 		return EventFieldType::nextKey(t, level);
971 	}
972 	void nextKeyAllLevels( uint64_t t) {
973 		return EventFieldType::nextKeyAllLevels(t);
974 	}
975 	void rollMetric( uint64_t t ) {
976 		return EventFieldType::rollMetric(t);
977 	}
978 	void flushField(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
979 		return EventFieldType::flushField(mk, rollTime, batch);
980 	}
981 	void registerField(MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys) {
982 		return EventFieldType::registerField(mk, fieldKeys);
983 	}
984 	void init() {
985 		return EventFieldType::init();
986 	}
987 
988 	// Set this field's value to the value of another field of exactly the same type.
989 	void setValueFrom(DynamicFieldBase *src, StringRef eventType) {
990 		DynamicField<T> *s = src->safe_downcast<T>(eventType);
991 		if(s != NULL)
992 			set(s->value);
993 		else
994 			clear();  // Not really necessary with proper use but just in case it is better to clear than use an old value.
995 	}
996 
997 	DynamicFieldBase * createNewWithValue(const char *name) {
998 		DynamicField<T> *n = new DynamicField<T>(name);
999 		n->set(value);
1000 		return n;
1001 	}
1002 
1003 	// Non virtuals
1004 	void set(T val) { value = val; }
1005 
1006 private:
1007 	T value;
1008 };
1009 
1010 // A DynamicEventMetric is an EventMetric whose field set can be modified at runtime.
1011 struct DynamicEventMetric : ReferenceCounted<DynamicEventMetric>, MetricUtil<DynamicEventMetric>, BaseEventMetric {
1012 private:
1013 	EventField<int64_t, TimeDescriptor> time;
1014 	bool latestRecorded;
1015 
1016 	// TODO:  A Standalone key type isn't ideal because on lookups a ref will be made Standalone just for the search
1017 	// All fields that are set with setField will be in fields.
1018 	std::map<Standalone<StringRef>, DynamicFieldBase *> fields;
1019 
1020 	// Set of fields not yet registered
1021 	std::set<Standalone<StringRef> > fieldsToRegister;
1022 
1023 	// Whether or not new fields have been added since the last logging.  fieldsToRegister can't
1024 	// be used for this because registration is independent of actually logging data.
1025 	bool newFields;
1026 
1027 	void newFieldAdded(Standalone<StringRef> const &fname) {
1028 		fieldsToRegister.insert(fname);  // So that this field will be registered when asked by the metrics logger actor later
1029 		newFields = true;                // So that log() will know that there is a new field
1030 
1031 		// Registration has now changed so set registered to false and trigger a reg change event if possible
1032 		registered = false;
1033 		if(pCollection != nullptr)
1034 			pCollection->metricRegistrationChanged.trigger();
1035 	}
1036 
1037 public:
1038 	DynamicEventMetric(MetricNameRef const &name, Void = Void());
1039     ~DynamicEventMetric();
1040 
1041 	virtual void addref() { ReferenceCounted<DynamicEventMetric>::addref(); }
1042 	virtual void delref() { ReferenceCounted<DynamicEventMetric>::delref(); }
1043 
1044 	void onEnable() {
1045 		// Must initialize fields, previously knobs may not have been set.
1046 		// Note that future fields will be okay because the field constructor will init and the knobs will be set.
1047 		time.init();
1048 		for(auto f : fields)
1049 			f.second->init();
1050 	}
1051 
1052 	// Set (or create) a new field in the event
1053 	template<typename ValueType>
1054 	void setField(const char *fieldName, const ValueType &value) {
1055 		StringRef fname((uint8_t *)fieldName, strlen(fieldName));
1056 		DynamicFieldBase *&p = fields[fname];
1057 		if (p != NULL) {
1058 			// FIXME:  This will break for DynamicEventMetric instances that are reused, such as use cases outside
1059 			// of TraceEvents.  Currently there are none in the code, and there may never any be but if you're here
1060 			// because you reused a DynamicEventMetric and got the error below then this issue must be fixed.  One
1061 			// possible solution is to have a flag in DynamicEventMetric which enables this check so that
1062 			// TraceEvent can preserve this behavior.
1063 			TraceEvent(SevError, "DuplicateTraceProperty").detail("Property", fieldName).backtrace();
1064 			if (g_network->isSimulated()) ASSERT(false);
1065 		}
1066 		p = new DynamicField<ValueType>(fieldName);
1067 		if(pCollection != nullptr)
1068 			p->init();
1069 		newFieldAdded(fname);
1070 
1071 		// This will return NULL if the datatype is wrong.
1072 		DynamicField<ValueType> *f = p->safe_downcast<ValueType>(getTypeName());
1073 		// Only set the field value if the type is correct.
1074 		// Another option here is to redefine the field to the new type and flush (roll) the existing field but that would create many keys
1075 		// with small values in the db if two frequent events keep tug-of-war'ing the types back and forth.
1076 		if(f != NULL)
1077 			f->set(value);
1078 		else
1079 			p->clear();  // Not really necessary with proper use but just in case it is better to clear than use an old value.
1080 	}
1081 
1082 	// This provides a way to first set fields in a temporary DynamicEventMetric and then push all of those field values
1083 	// into another DynamicEventMetric (which is actually logging somewhere) and log the event.
1084 	uint64_t setFieldsAndLogFrom(DynamicEventMetric *source, uint64_t explicitTime = 0) {
1085 		for(auto f : source->fields)
1086 		{
1087 			DynamicFieldBase *&p = fields[f.first];
1088 			if(p == NULL) {
1089 				p = f.second->createNewWithValue(f.first.toString().c_str());
1090 				if(pCollection != nullptr)
1091 					p->init();
1092 				newFieldAdded(f.first);
1093 			}
1094 			else
1095 				p->setValueFrom(f.second, getTypeName());
1096 		}
1097 		return log(explicitTime);
1098 	}
1099 
1100 	StringRef getTypeName() { return metricName.name; }
1101 
1102 	// Set all of the fields to their default values.
1103 	void clearFields() {
1104 		for(auto f : fields)
1105 			f.second->clear();
1106 	}
1107 
1108 	uint64_t log(uint64_t explicitTime = 0);
1109 
1110 	// Virtual function implementations
1111 	void flushData(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch);
1112 	void rollMetric( uint64_t t );
1113 	void registerFields(MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys);
1114 };
1115 
1116 // Continuous metrics are a single-field metric using an EventField<TimeAndValue<T>>
1117 template <typename T>
1118 struct TimeAndValue {
1119 	TimeAndValue() : time(0), value() {}
1120 	int64_t time;
1121 	T value;
1122 
1123 	// The metric field type for TimeAndValue is just the Value type.
1124 	static inline const StringRef metric_field_type() { return metricTypeName<T>(); }
1125 };
1126 
1127 // FieldHeader for continuous metrics, works for T = int, double, bool
1128 template <typename T>
1129 struct FieldHeader<TimeAndValue<T>> {
1130 	FieldHeader() : version(1), count(0), area(0), previous_time(0) {}
1131 	uint8_t version;
1132 	int64_t count;
1133 	// If T is a floating point type then area is a double, otherwise it's an int64_t
1134 	typename std::conditional<std::is_floating_point<T>::value, double, int64_t>::type area;
1135 	int64_t previous_time;
1136 
1137 	void update(FieldHeader const &h) {
1138 		count += h.count;
1139 		area += h.area;
1140 	}
1141 	void update(TimeAndValue<T> const &v) {
1142 		++count;
1143 		if(previous_time > 0)
1144 			area += v.value * (v.time - previous_time);
1145 		previous_time = v.time;
1146 	}
1147 	template<class Ar> void serialize(Ar &ar) {
1148 		serializer(ar, version);
1149 		ASSERT(version == 1);
1150 		serializer(ar, count, area);
1151 	}
1152 };
1153 
1154 template <> inline void FieldHeader<TimeAndValue<Standalone<StringRef>>>::update(TimeAndValue<Standalone<StringRef>> const &v) {
1155 	++count;
1156 	area += v.value.size();
1157 }
1158 
1159 // ValueBlock encoder/decoder for continuous metrics which have a type of TimeAndValue<T>
1160 // Uses encodings for int64_t and T and encodes (time, value, [time, value]...)
1161 template <typename T>
1162 struct FieldValueBlockEncoding<TimeAndValue<T>> {
1163 	FieldValueBlockEncoding() : time_encoding(), value_encoding() {}
1164 	inline void write(BinaryWriter &w, TimeAndValue<T> const &v) {
1165 		time_encoding.write(w, v.time);
1166 		value_encoding.write(w, v.value);
1167 	}
1168 	TimeAndValue<T> read(BinaryReader &r) {
1169 		TimeAndValue<T> result;
1170 		result.time = time_encoding.read(r);
1171 		result.value = value_encoding.read(r);
1172 		return result;
1173 	}
1174 	FieldValueBlockEncoding<int64_t> time_encoding;
1175 	FieldValueBlockEncoding<T> value_encoding;
1176 };
1177 
1178 // ValueBlock encoder/decoder specialization for continuous bool metrics because they are encoded
1179 // more efficiently than encoding the time and bool types separately.
1180 // Instead, time and value are combined to a single value (time delta << 1) + (value ? 1 : 0) and then
1181 // that value is encoded as a delta.
1182 template <>
1183 struct FieldValueBlockEncoding<TimeAndValue<bool>> {
1184 	FieldValueBlockEncoding() : prev(), prev_combined(0) {}
1185 	inline void write(BinaryWriter &w, TimeAndValue<bool> const &v) {
1186 		int64_t combined = (v.time << 1) | (v.value ? 1 : 0);
1187 		w << CompressedInt<int64_t>(combined - prev_combined);
1188 		prev = v;
1189 		prev_combined = combined;
1190 	}
1191 	TimeAndValue<bool> read(BinaryReader &r) {
1192 		CompressedInt<int64_t> d;
1193 		r >> d;
1194 		prev_combined += d.value;
1195 		prev.value = prev_combined & 1;
1196 		prev.time = prev_combined << 1;
1197 		return prev;
1198 	}
1199 	TimeAndValue<bool> prev;
1200 	int64_t prev_combined;
1201 };
1202 
1203 template <typename T>
1204 struct ContinuousMetric: NonCopyable, ReferenceCounted<ContinuousMetric<T>>, MetricUtil<ContinuousMetric<T>, T>, BaseMetric {
1205 	// Needed for MetricUtil
1206 	static const StringRef metricType;
1207 
1208 private:
1209 	EventField<TimeAndValue<T>> field;
1210 	TimeAndValue<T> tv;
1211 	bool recorded;
1212 
1213 public:
1214 	ContinuousMetric(MetricNameRef const &name, T const &initial)
1215 	  : BaseMetric(name), recorded(false) {
1216 		tv.value = initial;
1217 	}
1218 
1219 	virtual void addref() { ReferenceCounted<ContinuousMetric<T>>::addref(); }
1220 	virtual void delref() { ReferenceCounted<ContinuousMetric<T>>::delref(); }
1221 
1222 	T getValue() const {
1223 		return tv.value;
1224 	}
1225 
1226 	void flushData(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
1227 		if( !recorded ) {
1228 			batch.updates.push_back(std::make_pair(mk.packLatestKey(), getLatestAsValue()));
1229 			recorded = true;
1230 		}
1231 
1232 		field.flushField(mk, rollTime, batch);
1233 	}
1234 
1235 	void rollMetric(uint64_t t) {
1236 		field.rollMetric(t);
1237 	}
1238 
1239 	Standalone<StringRef> getLatestAsValue() {
1240 		FieldValueBlockEncoding< TimeAndValue< T > > enc;
1241 		BinaryWriter wr(AssumeVersion(currentProtocolVersion));
1242 		// Write a header so the client can treat this value like a normal data value block.
1243 		// TOOD: If it is useful, this could be the current header value of the most recently logged level.
1244 		wr << FieldHeader<TimeAndValue<T>>();
1245 		enc.write(wr, tv);
1246 		return wr.toValue();
1247 	}
1248 
1249 	void onEnable() {
1250 		field.init();
1251 		change();
1252 	}
1253 
1254 	void onDisable() {
1255 		change();
1256 	}
1257 
1258 	void set(const T &v) {
1259 		if(v != tv.value) {
1260 			if(enabled)
1261 				change();
1262 			tv.value = v;
1263 		}
1264 	}
1265 
1266 	// requires += on T
1267 	void add(const T &delta) {
1268 		if(delta != T()) {
1269 			if(enabled)
1270 				change();
1271 			tv.value += delta;
1272 		}
1273 	}
1274 
1275 	// requires ! on T
1276 	void toggle() {
1277 		if(enabled)
1278 			change();
1279 		tv.value = !tv.value;
1280 	}
1281 
1282 	void change() {
1283 		uint64_t toggleTime = timer_int();
1284 		int64_t bytes = 0;
1285 
1286 		if(tv.time != 0) {
1287 			double x = g_random->random01();
1288 
1289 			int64_t l = 0;
1290 			if (x == 0.0)
1291 				l = FLOW_KNOBS->MAX_METRIC_LEVEL-1;
1292 			else if (toggleTime != tv.time)
1293 				l = std::min(
1294 					FLOW_KNOBS->MAX_METRIC_LEVEL-1,
1295 					(int64_t)(
1296 						log((toggleTime - tv.time) / x) /
1297 							FLOW_KNOBS->METRIC_LEVEL_DIVISOR
1298 					)
1299 				);
1300 
1301 			if(!canLog(l))
1302 				return;
1303 
1304 			bool overflow = false;
1305 			field.log(tv, tv.time, l, overflow, bytes);
1306 			if(overflow)
1307 				field.nextKey(toggleTime, l);
1308 		}
1309 		tv.time = toggleTime;
1310 		recorded = false;
1311 		TDMetricCollection::getTDMetrics()->checkRoll(tv.time, bytes);
1312 	}
1313 };
1314 
1315 typedef ContinuousMetric<int64_t> Int64Metric;
1316 typedef Int64Metric VersionMetric;
1317 typedef ContinuousMetric<bool> BoolMetric;
1318 typedef ContinuousMetric<Standalone<StringRef>> StringMetric;
1319 
1320 // MetricHandle / EventMetricHandle are wrappers for a Reference<MetricType> which provides
1321 // the following interface conveniences
1322 //
1323 //   * The underlying metric reference is always initialized to a valid object.  That valid object
1324 //     may not actually be in a metric collection and therefore may not actually be able to write
1325 //     data to a database, but it will work in other ways (i.e. int metrics will act like integers).
1326 //
1327 //   * Operator =, ++, --, +=, and -= can be used as though the handle is an object of the MetricType::ValueType of
1328 //     the metric type for which it is a handle.
1329 //
1330 //   * Operator -> is defined such that the MetricHandle acts like a pointer to the underlying MetricType
1331 //
1332 //   * Cast operator to MetricType::ValueType is defined so that the handle will act like a MetricType::ValueType
1333 //
1334 //   * The last three features allow, for example, a MetricHandle<Int64Metric> to be a drop-in replacement for an int64_t.
1335 //
1336 template <typename T>
1337 struct MetricHandle {
1338 	template<typename ValueType = typename T::ValueType>
1339 	MetricHandle(StringRef const &name = StringRef(), StringRef const &id = StringRef(), ValueType const &initial = ValueType())
1340 	  : ref(T::getOrCreateInstance(name, id, true, initial)) {
1341 	}
1342 
1343 	// Initialize this handle to point to a new or existing metric with (name, id).  If a new metric is created then the handle's
1344 	// current metric's current value will be the new metric's initial value.  This allows Metric handle users to treate their
1345 	// Metric variables as normal variables and then bind them to actual logging metrics later while continuing with the current value.
1346 	void init(StringRef const &name, StringRef const &id = StringRef()) {
1347 		ref = T::getOrCreateInstance(name, id, true, ref->getValue());
1348 	}
1349 
1350 	void init(StringRef const &name, StringRef const &id, typename T::ValueType const &initial) {
1351 		ref = T::getOrCreateInstance(name, id, true, initial);
1352 	}
1353 
1354 	void operator=(typename T::ValueType const &v) {
1355 		ref->set(v);
1356 	}
1357 	void operator++() {
1358 		ref->add(1);
1359 	}
1360 	void operator++(int) {
1361 		ref->add(1);
1362 	}
1363 	void operator--() {
1364 		ref->add(-1);
1365 	}
1366 	void operator--(int) {
1367 		ref->add(-1);
1368 	}
1369 	void operator+=(typename T::ValueType const &v) {
1370 		ref->add(v);
1371 	}
1372 	void operator-=(typename T::ValueType const &v) {
1373 		ref->add(-v);
1374 	}
1375 
1376 	T * operator-> () { return ref.getPtr(); }
1377 
1378 	operator typename T::ValueType () const { return ref->getValue(); }
1379 	typename T::ValueType getValue() const  { return ref->getValue(); }
1380 
1381 	Reference<T> ref;
1382 };
1383 
1384 template<class T>
1385 struct Traceable<MetricHandle<T>> : Traceable<typename T::ValueType> {
1386 	static std::string toString(const MetricHandle<T>& value) {
1387 		return Traceable<typename T::ValueType>::toString(value.getValue());
1388 	}
1389 };
1390 
1391 template<class T>
1392 struct SpecialTraceMetricType<MetricHandle<T>> : SpecialTraceMetricType<typename T::ValueType> {
1393 	using parent = SpecialTraceMetricType<typename T::ValueType>;
1394 	static auto getValue(const MetricHandle<T>& value) -> decltype(parent::getValue(value.getValue())) {
1395 		return parent::getValue(value.getValue());
1396 	}
1397 };
1398 
1399 typedef MetricHandle<Int64Metric> Int64MetricHandle;
1400 typedef MetricHandle<VersionMetric> VersionMetricHandle;
1401 typedef MetricHandle<BoolMetric> BoolMetricHandle;
1402 typedef MetricHandle<StringMetric> StringMetricHandle;
1403 
1404 template <typename E>
1405 using EventMetricHandle = MetricHandle<EventMetric<E>>;
1406 
1407 #include "flow/unactorcompiler.h"
1408 
1409 #endif
1410