1package lasr
2
3import (
4	"fmt"
5	"sync"
6	"time"
7
8	bolt "go.etcd.io/bbolt"
9)
10
11// Q is a persistent message queue. Its methods are goroutine-safe.
12// Q retains the data that is sent to it until messages are acked (or nacked
13// without retry)
14type Q struct {
15	db          *bolt.DB
16	name        []byte
17	seq         Sequencer
18	keys        bucketKeys
19	messages    *fifo
20	closed      chan struct{}
21	inFlight    sync.WaitGroup
22	waker       *waker
23	optsApplied bool
24	mu          sync.RWMutex
25}
26
27type bucketKeys struct {
28	ready     []byte
29	returned  []byte
30	unacked   []byte
31	delayed   []byte
32	waiting   []byte
33	blockedOn []byte
34	blocking  []byte
35}
36
37// Close closes q. When q is closed, Send, Receive, and Close will return
38// ErrQClosed. Close blocks until all messages in the "unacked" state are Acked
39// or Nacked.
40func (q *Q) Close() error {
41	q.messages.Lock()
42	defer q.messages.Unlock()
43	select {
44	case <-q.closed:
45		return ErrQClosed
46	default:
47		close(q.closed)
48	}
49	q.inFlight.Wait()
50	return q.equilibrate()
51}
52
53func (q *Q) isClosed() bool {
54	select {
55	case <-q.closed:
56		return true
57	default:
58		return false
59	}
60}
61
62func (q *Q) String() string {
63	return fmt.Sprintf("Q{Name: %q}", string(q.name))
64}
65
66// NewQ creates a new Q. Only one queue should be created in a given bolt db,
67// unless compaction is disabled.
68func NewQ(db *bolt.DB, name string, options ...Option) (*Q, error) {
69	bName := []byte(name)
70	closed := make(chan struct{})
71	q := &Q{
72		db:   db,
73		name: bName,
74		keys: bucketKeys{
75			ready:     []byte("ready"),
76			unacked:   []byte("unacked"),
77			delayed:   []byte("delayed"),
78			waiting:   []byte("waiting"),
79			blockedOn: []byte("blockedOn"),
80			blocking:  []byte("blocking"),
81		},
82		waker:  newWaker(closed),
83		closed: closed,
84	}
85	for _, o := range options {
86		if err := o(q); err != nil {
87			return nil, fmt.Errorf("lasr: couldn't create Q: %s", err)
88		}
89	}
90	q.optsApplied = true
91	if err := q.init(); err != nil {
92		return nil, err
93	}
94	return q, nil
95}
96
97func (q *Q) init() error {
98	if q.messages == nil {
99		q.messages = newFifo(1)
100	}
101	return q.equilibrate()
102}
103
104func (q *Q) equilibrate() error {
105	q.mu.RLock()
106	defer q.mu.RUnlock()
107	return q.db.Update(func(tx *bolt.Tx) error {
108		bucket, err := q.bucket(tx, q.keys.ready)
109		if err != nil {
110			return err
111		}
112		readyKeys := bucket.Stats().KeyN
113		unacked, err := q.bucket(tx, q.keys.unacked)
114		if err != nil {
115			return err
116		}
117		cursor := unacked.Cursor()
118		// put unacked messages from previous session back in the queue
119		for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
120			if err := bucket.Put(k, v); err != nil {
121				return err
122			}
123			readyKeys++
124		}
125		if readyKeys > 0 && !q.isClosed() {
126			q.waker.Wake()
127		}
128		q.messages.Drain()
129		root, err := tx.CreateBucketIfNotExists(q.name)
130		if err != nil {
131			return err
132		}
133		if len(q.keys.delayed) > 0 && !q.isClosed() {
134			// WakeAt for all delayed messages
135			delayed, err := q.bucket(tx, q.keys.delayed)
136			if err != nil {
137				return err
138			}
139			delayC := delayed.Cursor()
140			for k, _ := delayC.First(); k != nil; k, _ = delayC.Next() {
141				var id Uint64ID
142				if err := id.UnmarshalBinary(k); err != nil {
143					return fmt.Errorf("error reading delayed key %v: %s", k, err)
144				}
145				q.waker.WakeAt(time.Unix(0, int64(id)))
146			}
147		}
148		// Delete the unacked bucket now that the unacked messages have been
149		// returned to the ready bucket.
150		return root.DeleteBucket(q.keys.unacked)
151	})
152}
153
154type bucketer interface {
155	CreateBucketIfNotExists([]byte) (*bolt.Bucket, error)
156	Bucket([]byte) *bolt.Bucket
157}
158
159func (q *Q) bucket(tx *bolt.Tx, key []byte) (*bolt.Bucket, error) {
160	bucket, err := tx.CreateBucketIfNotExists(q.name)
161	if err != nil {
162		return nil, err
163	}
164	bucket, err = bucket.CreateBucketIfNotExists(key)
165	if err != nil {
166		return nil, err
167	}
168	return bucket, nil
169}
170