1package lasr 2 3import ( 4 "fmt" 5 "sync" 6 "time" 7 8 bolt "go.etcd.io/bbolt" 9) 10 11// Q is a persistent message queue. Its methods are goroutine-safe. 12// Q retains the data that is sent to it until messages are acked (or nacked 13// without retry) 14type Q struct { 15 db *bolt.DB 16 name []byte 17 seq Sequencer 18 keys bucketKeys 19 messages *fifo 20 closed chan struct{} 21 inFlight sync.WaitGroup 22 waker *waker 23 optsApplied bool 24 mu sync.RWMutex 25} 26 27type bucketKeys struct { 28 ready []byte 29 returned []byte 30 unacked []byte 31 delayed []byte 32 waiting []byte 33 blockedOn []byte 34 blocking []byte 35} 36 37// Close closes q. When q is closed, Send, Receive, and Close will return 38// ErrQClosed. Close blocks until all messages in the "unacked" state are Acked 39// or Nacked. 40func (q *Q) Close() error { 41 q.messages.Lock() 42 defer q.messages.Unlock() 43 select { 44 case <-q.closed: 45 return ErrQClosed 46 default: 47 close(q.closed) 48 } 49 q.inFlight.Wait() 50 return q.equilibrate() 51} 52 53func (q *Q) isClosed() bool { 54 select { 55 case <-q.closed: 56 return true 57 default: 58 return false 59 } 60} 61 62func (q *Q) String() string { 63 return fmt.Sprintf("Q{Name: %q}", string(q.name)) 64} 65 66// NewQ creates a new Q. Only one queue should be created in a given bolt db, 67// unless compaction is disabled. 68func NewQ(db *bolt.DB, name string, options ...Option) (*Q, error) { 69 bName := []byte(name) 70 closed := make(chan struct{}) 71 q := &Q{ 72 db: db, 73 name: bName, 74 keys: bucketKeys{ 75 ready: []byte("ready"), 76 unacked: []byte("unacked"), 77 delayed: []byte("delayed"), 78 waiting: []byte("waiting"), 79 blockedOn: []byte("blockedOn"), 80 blocking: []byte("blocking"), 81 }, 82 waker: newWaker(closed), 83 closed: closed, 84 } 85 for _, o := range options { 86 if err := o(q); err != nil { 87 return nil, fmt.Errorf("lasr: couldn't create Q: %s", err) 88 } 89 } 90 q.optsApplied = true 91 if err := q.init(); err != nil { 92 return nil, err 93 } 94 return q, nil 95} 96 97func (q *Q) init() error { 98 if q.messages == nil { 99 q.messages = newFifo(1) 100 } 101 return q.equilibrate() 102} 103 104func (q *Q) equilibrate() error { 105 q.mu.RLock() 106 defer q.mu.RUnlock() 107 return q.db.Update(func(tx *bolt.Tx) error { 108 bucket, err := q.bucket(tx, q.keys.ready) 109 if err != nil { 110 return err 111 } 112 readyKeys := bucket.Stats().KeyN 113 unacked, err := q.bucket(tx, q.keys.unacked) 114 if err != nil { 115 return err 116 } 117 cursor := unacked.Cursor() 118 // put unacked messages from previous session back in the queue 119 for k, v := cursor.First(); k != nil; k, v = cursor.Next() { 120 if err := bucket.Put(k, v); err != nil { 121 return err 122 } 123 readyKeys++ 124 } 125 if readyKeys > 0 && !q.isClosed() { 126 q.waker.Wake() 127 } 128 q.messages.Drain() 129 root, err := tx.CreateBucketIfNotExists(q.name) 130 if err != nil { 131 return err 132 } 133 if len(q.keys.delayed) > 0 && !q.isClosed() { 134 // WakeAt for all delayed messages 135 delayed, err := q.bucket(tx, q.keys.delayed) 136 if err != nil { 137 return err 138 } 139 delayC := delayed.Cursor() 140 for k, _ := delayC.First(); k != nil; k, _ = delayC.Next() { 141 var id Uint64ID 142 if err := id.UnmarshalBinary(k); err != nil { 143 return fmt.Errorf("error reading delayed key %v: %s", k, err) 144 } 145 q.waker.WakeAt(time.Unix(0, int64(id))) 146 } 147 } 148 // Delete the unacked bucket now that the unacked messages have been 149 // returned to the ready bucket. 150 return root.DeleteBucket(q.keys.unacked) 151 }) 152} 153 154type bucketer interface { 155 CreateBucketIfNotExists([]byte) (*bolt.Bucket, error) 156 Bucket([]byte) *bolt.Bucket 157} 158 159func (q *Q) bucket(tx *bolt.Tx, key []byte) (*bolt.Bucket, error) { 160 bucket, err := tx.CreateBucketIfNotExists(q.name) 161 if err != nil { 162 return nil, err 163 } 164 bucket, err = bucket.CreateBucketIfNotExists(key) 165 if err != nil { 166 return nil, err 167 } 168 return bucket, nil 169} 170