1package phony 2 3import ( 4 "sync/atomic" 5 "unsafe" 6) 7 8// A message in the queue 9type queueElem struct { 10 msg interface{} // func() or func() bool 11 next unsafe.Pointer // *queueElem, accessed atomically 12 count uint8 13} 14 15// Inbox is an ordered queue of messages which an Actor will process sequentially. 16// Messages are meant to be in the form of non-blocking functions of 0 arguments, often closures. 17// The intent is for the Inbox struct to be embedded in other structs to satisfy the Actor interface, where the other fields of the struct are owned by the Actor. 18// It is up to the user to ensure that memory is used safely, and that messages do not contain blocking operations. 19// An Inbox must not be copied after first use. 20type Inbox struct { 21 head *queueElem // Used carefully to avoid needing atomics 22 tail unsafe.Pointer // *queueElem, accessed atomically 23} 24 25// Actor is the interface for Actors, based on their ability to receive a message from another Actor. 26// It's meant so that structs which embed an Inbox can satisfy a mutually compatible interface for message passing. 27type Actor interface { 28 Act(Actor, func()) 29 enqueue(interface{}) bool 30 restart() 31 advance() bool 32} 33 34// enqueue puts a message on the Actor's inbox queue and returns the number of messages that have been enqueued since the inbox was last empty. 35// If the inbox was empty, then the actor was not already running, so enqueue starts it. 36func (a *Inbox) enqueue(msg interface{}) bool { 37 if msg == nil { 38 panic("tried to send nil message") 39 } 40 q := &queueElem{msg: msg} 41 var tail *queueElem 42 for { 43 q.count = 0 44 tail = (*queueElem)(atomic.LoadPointer(&a.tail)) 45 if tail != nil { 46 q.count = tail.count + 1 47 if q.count == 0 { 48 q.count = ^q.count 49 } 50 } 51 if atomic.CompareAndSwapPointer(&a.tail, unsafe.Pointer(tail), unsafe.Pointer(q)) { 52 break 53 } 54 } 55 if tail != nil { 56 //An old tail exists, so update its next pointer to reference q 57 atomic.StorePointer(&tail.next, unsafe.Pointer(q)) 58 } else { 59 // No old tail existed, so no worker is currently running 60 // Update the head to point to q, then start the worker 61 a.head = q 62 a.restart() 63 } 64 return q.count >= backpressureThreshold 65} 66 67// Act adds a message to an Actor's Inbox which tells the Actor to execute the provided function at some point in the future. 68// When one Actor sends a message to another, the sender is meant to provide itself as the first argument to this function. 69// If the receiver's Inbox contains too many messages, and the sender argument is non-nil, then the sender is scheduled to pause at a safe point in the future, until the receiver has finished running the action. 70// A nil first argument is valid, and will prevent any scheduling changes from happening, in cases where an Actor wants to send a message to itself (where this scheduling is just useless overhead) or must receive a message from non-Actor code. 71func (a *Inbox) Act(from Actor, action func()) { 72 if a.enqueue(action) && from != nil { 73 var s stop 74 a.enqueue(func() { 75 if !s.stop() && from.advance() { 76 from.restart() 77 } 78 }) 79 from.enqueue(s.stop) 80 } 81} 82 83// Block adds a message to an Actor's Inbox which tells the Actor to execute the provided function at some point in the future. 84// It then blocks until the actor has finished running the provided function. 85// Block meant exclusively as a convenience function for non-Actor code to send messages and wait for responses. 86// If an Actor calls Block, then it may cause a deadlock, so Act should always be used instead. 87func Block(actor Actor, action func()) { 88 done := make(chan struct{}) 89 actor.Act(nil, func() { action(); close(done) }) 90 <-done 91} 92 93// run is executed when a message is placed in an empty Inbox, and launches a worker goroutine. 94// The worker goroutine processes messages from the Inbox until empty, and then exits. 95func (a *Inbox) run() { 96 running := true 97 for running { 98 switch msg := a.head.msg.(type) { 99 case func() bool: // used internally by backpressure 100 if msg() { 101 return 102 } 103 case func(): // all external use from Act 104 msg() 105 } 106 running = a.advance() 107 } 108} 109 110// returns true if we still have more work to do 111func (a *Inbox) advance() bool { 112 head := a.head 113 for { 114 a.head = (*queueElem)(atomic.LoadPointer(&head.next)) 115 if a.head != nil { 116 // Move to the next message 117 return true // more left to do 118 } else if !atomic.CompareAndSwapPointer(&a.tail, unsafe.Pointer(head), nil) { 119 // The head is not the tail, but there was no head.next when we checked 120 // Somebody must be updating it right now, so try again 121 continue 122 } else { 123 // Head and tail are now both nil, our work here is done, exit 124 return false // done processing messages 125 } 126 } 127} 128 129func (a *Inbox) restart() { 130 go a.run() 131} 132 133type stop uint32 134 135func (s *stop) stop() bool { 136 return atomic.SwapUint32((*uint32)(s), 1) == 0 137} 138