1package diodes
2
3import (
4	"log"
5	"sync/atomic"
6	"unsafe"
7)
8
9// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
10// reader (go-routine A). It is not thread safe for multiple readers.
11type ManyToOne struct {
12	writeIndex uint64
13	buffer     []unsafe.Pointer
14	readIndex  uint64
15	alerter    Alerter
16}
17
18// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
19// is optimzed for many writers (on go-routines B-n) and a single reader
20// (on go-routine A). The alerter is invoked on the read's go-routine. It is
21// called when it notices that the writer go-routine has passed it and wrote
22// over data. A nil can be used to ignore alerts.
23func NewManyToOne(size int, alerter Alerter) *ManyToOne {
24	if alerter == nil {
25		alerter = AlertFunc(func(int) {})
26	}
27
28	d := &ManyToOne{
29		buffer:  make([]unsafe.Pointer, size),
30		alerter: alerter,
31	}
32
33	// Start write index at the value before 0
34	// to allow the first write to use AddUint64
35	// and still have a beginning index of 0
36	d.writeIndex = ^d.writeIndex
37	return d
38}
39
40// Set sets the data in the next slot of the ring buffer.
41func (d *ManyToOne) Set(data GenericDataType) {
42	for {
43		writeIndex := atomic.AddUint64(&d.writeIndex, 1)
44		idx := writeIndex % uint64(len(d.buffer))
45		old := atomic.LoadPointer(&d.buffer[idx])
46
47		if old != nil &&
48			(*bucket)(old) != nil &&
49			(*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
50			log.Println("Diode set collision: consider using a larger diode")
51			continue
52		}
53
54		newBucket := &bucket{
55			data: data,
56			seq:  writeIndex,
57		}
58
59		if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
60			log.Println("Diode set collision: consider using a larger diode")
61			continue
62		}
63
64		return
65	}
66}
67
68// TryNext will attempt to read from the next slot of the ring buffer.
69// If there is not data available, it will return (nil, false).
70func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
71	// Read a value from the ring buffer based on the readIndex.
72	idx := d.readIndex % uint64(len(d.buffer))
73	result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))
74
75	// When the result is nil that means the writer has not had the
76	// opportunity to write a value into the diode. This value must be ignored
77	// and the read head must not increment.
78	if result == nil {
79		return nil, false
80	}
81
82	// When the seq value is less than the current read index that means a
83	// value was read from idx that was previously written but has since has
84	// been dropped. This value must be ignored and the read head must not
85	// increment.
86	//
87	// The simulation for this scenario assumes the fast forward occurred as
88	// detailed below.
89	//
90	// 5. The reader reads again getting seq 5. It then reads again expecting
91	//    seq 6 but gets seq 2. This is a read of a stale value that was
92	//    effectively "dropped" so the read fails and the read head stays put.
93	//    `| 4 | 5 | 2 | 3 |` r: 7, w: 6
94	//
95	if result.seq < d.readIndex {
96		return nil, false
97	}
98
99	// When the seq value is greater than the current read index that means a
100	// value was read from idx that overwrote the value that was expected to
101	// be at this idx. This happens when the writer has lapped the reader. The
102	// reader needs to catch up to the writer so it moves its write head to
103	// the new seq, effectively dropping the messages that were not read in
104	// between the two values.
105	//
106	// Here is a simulation of this scenario:
107	//
108	// 1. Both the read and write heads start at 0.
109	//    `| nil | nil | nil | nil |` r: 0, w: 0
110	// 2. The writer fills the buffer.
111	//    `| 0 | 1 | 2 | 3 |` r: 0, w: 4
112	// 3. The writer laps the read head.
113	//    `| 4 | 5 | 2 | 3 |` r: 0, w: 6
114	// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
115	//    this forces the reader to fast forward to 5.
116	//    `| 4 | 5 | 2 | 3 |` r: 5, w: 6
117	//
118	if result.seq > d.readIndex {
119		dropped := result.seq - d.readIndex
120		d.readIndex = result.seq
121		d.alerter.Alert(int(dropped))
122	}
123
124	// Only increment read index if a regular read occurred (where seq was
125	// equal to readIndex) or a value was read that caused a fast forward
126	// (where seq was greater than readIndex).
127	//
128	d.readIndex++
129	return result.data, true
130}
131