1package lv 2 3import "sync" 4 5// NewSpace returns an N-dimensional vector space. 6func NewSpace() *Space { 7 return &Space{} 8} 9 10// Space represents an N-dimensional vector space. Each name and unique label 11// value pair establishes a new dimension and point within that dimension. Order 12// matters, i.e. [a=1 b=2] identifies a different timeseries than [b=2 a=1]. 13type Space struct { 14 mtx sync.RWMutex 15 nodes map[string]*node 16} 17 18// Observe locates the time series identified by the name and label values in 19// the vector space, and appends the value to the list of observations. 20func (s *Space) Observe(name string, lvs LabelValues, value float64) { 21 s.nodeFor(name).observe(lvs, value) 22} 23 24// Add locates the time series identified by the name and label values in 25// the vector space, and appends the delta to the last value in the list of 26// observations. 27func (s *Space) Add(name string, lvs LabelValues, delta float64) { 28 s.nodeFor(name).add(lvs, delta) 29} 30 31// Walk traverses the vector space and invokes fn for each non-empty time series 32// which is encountered. Return false to abort the traversal. 33func (s *Space) Walk(fn func(name string, lvs LabelValues, observations []float64) bool) { 34 s.mtx.RLock() 35 defer s.mtx.RUnlock() 36 for name, node := range s.nodes { 37 f := func(lvs LabelValues, observations []float64) bool { return fn(name, lvs, observations) } 38 if !node.walk(LabelValues{}, f) { 39 return 40 } 41 } 42} 43 44// Reset empties the current space and returns a new Space with the old 45// contents. Reset a Space to get an immutable copy suitable for walking. 46func (s *Space) Reset() *Space { 47 s.mtx.Lock() 48 defer s.mtx.Unlock() 49 n := NewSpace() 50 n.nodes, s.nodes = s.nodes, n.nodes 51 return n 52} 53 54func (s *Space) nodeFor(name string) *node { 55 s.mtx.Lock() 56 defer s.mtx.Unlock() 57 if s.nodes == nil { 58 s.nodes = map[string]*node{} 59 } 60 n, ok := s.nodes[name] 61 if !ok { 62 n = &node{} 63 s.nodes[name] = n 64 } 65 return n 66} 67 68// node exists at a specific point in the N-dimensional vector space of all 69// possible label values. The node collects observations and has child nodes 70// with greater specificity. 71type node struct { 72 mtx sync.RWMutex 73 observations []float64 74 children map[pair]*node 75} 76 77type pair struct{ label, value string } 78 79func (n *node) observe(lvs LabelValues, value float64) { 80 n.mtx.Lock() 81 defer n.mtx.Unlock() 82 if len(lvs) <= 0 { 83 n.observations = append(n.observations, value) 84 return 85 } 86 if len(lvs) < 2 { 87 panic("too few LabelValues; programmer error!") 88 } 89 head, tail := pair{lvs[0], lvs[1]}, lvs[2:] 90 if n.children == nil { 91 n.children = map[pair]*node{} 92 } 93 child, ok := n.children[head] 94 if !ok { 95 child = &node{} 96 n.children[head] = child 97 } 98 child.observe(tail, value) 99} 100 101func (n *node) add(lvs LabelValues, delta float64) { 102 n.mtx.Lock() 103 defer n.mtx.Unlock() 104 if len(lvs) <= 0 { 105 var value float64 106 if len(n.observations) > 0 { 107 value = last(n.observations) + delta 108 } else { 109 value = delta 110 } 111 n.observations = append(n.observations, value) 112 return 113 } 114 if len(lvs) < 2 { 115 panic("too few LabelValues; programmer error!") 116 } 117 head, tail := pair{lvs[0], lvs[1]}, lvs[2:] 118 if n.children == nil { 119 n.children = map[pair]*node{} 120 } 121 child, ok := n.children[head] 122 if !ok { 123 child = &node{} 124 n.children[head] = child 125 } 126 child.add(tail, delta) 127} 128 129func (n *node) walk(lvs LabelValues, fn func(LabelValues, []float64) bool) bool { 130 n.mtx.RLock() 131 defer n.mtx.RUnlock() 132 if len(n.observations) > 0 && !fn(lvs, n.observations) { 133 return false 134 } 135 for p, child := range n.children { 136 if !child.walk(append(lvs, p.label, p.value), fn) { 137 return false 138 } 139 } 140 return true 141} 142 143func last(a []float64) float64 { 144 return a[len(a)-1] 145} 146