1package phony
2
3import (
4	"sync/atomic"
5	"unsafe"
6)
7
8// A message in the queue
9type queueElem struct {
10	msg   interface{}    // func() or func() bool
11	next  unsafe.Pointer // *queueElem, accessed atomically
12	count uint8
13}
14
15// Inbox is an ordered queue of messages which an Actor will process sequentially.
16// Messages are meant to be in the form of non-blocking functions of 0 arguments, often closures.
17// The intent is for the Inbox struct to be embedded in other structs to satisfy the Actor interface, where the other fields of the struct are owned by the Actor.
18// It is up to the user to ensure that memory is used safely, and that messages do not contain blocking operations.
19// An Inbox must not be copied after first use.
20type Inbox struct {
21	head *queueElem     // Used carefully to avoid needing atomics
22	tail unsafe.Pointer // *queueElem, accessed atomically
23}
24
25// Actor is the interface for Actors, based on their ability to receive a message from another Actor.
26// It's meant so that structs which embed an Inbox can satisfy a mutually compatible interface for message passing.
27type Actor interface {
28	Act(Actor, func())
29	enqueue(interface{}) bool
30	restart()
31	advance() bool
32}
33
34// enqueue puts a message on the Actor's inbox queue and returns the number of messages that have been enqueued since the inbox was last empty.
35// If the inbox was empty, then the actor was not already running, so enqueue starts it.
36func (a *Inbox) enqueue(msg interface{}) bool {
37	if msg == nil {
38		panic("tried to send nil message")
39	}
40	q := &queueElem{msg: msg}
41	var tail *queueElem
42	for {
43		q.count = 0
44		tail = (*queueElem)(atomic.LoadPointer(&a.tail))
45		if tail != nil {
46			q.count = tail.count + 1
47			if q.count == 0 {
48				q.count = ^q.count
49			}
50		}
51		if atomic.CompareAndSwapPointer(&a.tail, unsafe.Pointer(tail), unsafe.Pointer(q)) {
52			break
53		}
54	}
55	if tail != nil {
56		//An old tail exists, so update its next pointer to reference q
57		atomic.StorePointer(&tail.next, unsafe.Pointer(q))
58	} else {
59		// No old tail existed, so no worker is currently running
60		// Update the head to point to q, then start the worker
61		a.head = q
62		a.restart()
63	}
64	return q.count >= backpressureThreshold
65}
66
67// Act adds a message to an Actor's Inbox which tells the Actor to execute the provided function at some point in the future.
68// When one Actor sends a message to another, the sender is meant to provide itself as the first argument to this function.
69// If the receiver's Inbox contains too many messages, and the sender argument is non-nil, then the sender is scheduled to pause at a safe point in the future, until the receiver has finished running the action.
70// A nil first argument is valid, and will prevent any scheduling changes from happening, in cases where an Actor wants to send a message to itself (where this scheduling is just useless overhead) or must receive a message from non-Actor code.
71func (a *Inbox) Act(from Actor, action func()) {
72	if a.enqueue(action) && from != nil {
73		var s stop
74		a.enqueue(func() {
75			if !s.stop() && from.advance() {
76				from.restart()
77			}
78		})
79		from.enqueue(s.stop)
80	}
81}
82
83// Block adds a message to an Actor's Inbox which tells the Actor to execute the provided function at some point in the future.
84// It then blocks until the actor has finished running the provided function.
85// Block meant exclusively as a convenience function for non-Actor code to send messages and wait for responses.
86// If an Actor calls Block, then it may cause a deadlock, so Act should always be used instead.
87func Block(actor Actor, action func()) {
88	done := make(chan struct{})
89	actor.Act(nil, func() { action(); close(done) })
90	<-done
91}
92
93// run is executed when a message is placed in an empty Inbox, and launches a worker goroutine.
94// The worker goroutine processes messages from the Inbox until empty, and then exits.
95func (a *Inbox) run() {
96	running := true
97	for running {
98		switch msg := a.head.msg.(type) {
99		case func() bool: // used internally by backpressure
100			if msg() {
101				return
102			}
103		case func(): // all external use from Act
104			msg()
105		}
106		running = a.advance()
107	}
108}
109
110// returns true if we still have more work to do
111func (a *Inbox) advance() bool {
112	head := a.head
113	for {
114		a.head = (*queueElem)(atomic.LoadPointer(&head.next))
115		if a.head != nil {
116			// Move to the next message
117			return true // more left to do
118		} else if !atomic.CompareAndSwapPointer(&a.tail, unsafe.Pointer(head), nil) {
119			// The head is not the tail, but there was no head.next when we checked
120			// Somebody must be updating it right now, so try again
121			continue
122		} else {
123			// Head and tail are now both nil, our work here is done, exit
124			return false // done processing messages
125		}
126	}
127}
128
129func (a *Inbox) restart() {
130	go a.run()
131}
132
133type stop uint32
134
135func (s *stop) stop() bool {
136	return atomic.SwapUint32((*uint32)(s), 1) == 0
137}
138