1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 //
4 // This software is supplied under the terms of the MIT License, a
5 // copy of which should be located in the distribution where this
6 // file was obtained (LICENSE.txt).  A copy of the license may also be
7 // found online at https://opensource.org/licenses/MIT.
8 //
9 
10 #include "nng_impl.h"
11 
12 // Light-weight message queue. These are derived from our heavy-weight
13 // message queues, but are less "featureful", but more useful for
14 // performance sensitive contexts.  Locking must be done by the caller.
15 
16 int
nni_lmq_init(nni_lmq * lmq,size_t cap)17 nni_lmq_init(nni_lmq *lmq, size_t cap)
18 {
19 	size_t alloc;
20 
21 	// We prefer alloc to a power of 2, this allows us to do modulo
22 	// operations as a power of two, for efficiency.  It does possibly
23 	// waste some space, but never more than 2x.  Consumers should try
24 	// for powers of two if they are concerned about efficiency.
25 	alloc = 2;
26 	while (alloc < cap) {
27 		alloc *= 2;
28 	}
29 	if ((lmq->lmq_msgs = nni_zalloc(sizeof(nng_msg *) * alloc)) == NULL) {
30 		NNI_FREE_STRUCT(lmq);
31 		return (NNG_ENOMEM);
32 	}
33 	lmq->lmq_cap   = cap;
34 	lmq->lmq_alloc = alloc;
35 	lmq->lmq_mask  = (alloc - 1);
36 	lmq->lmq_len   = 0;
37 	lmq->lmq_get   = 0;
38 	lmq->lmq_put   = 0;
39 
40 	return (0);
41 }
42 
43 void
nni_lmq_fini(nni_lmq * lmq)44 nni_lmq_fini(nni_lmq *lmq)
45 {
46 	if (lmq == NULL) {
47 		return;
48 	}
49 
50 	/* Free any orphaned messages. */
51 	while (lmq->lmq_len > 0) {
52 		nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
53 		lmq->lmq_get &= lmq->lmq_mask;
54 		lmq->lmq_len--;
55 		nni_msg_free(msg);
56 	}
57 
58 	nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
59 }
60 
61 void
nni_lmq_flush(nni_lmq * lmq)62 nni_lmq_flush(nni_lmq *lmq)
63 {
64 	while (lmq->lmq_len > 0) {
65 		nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
66 		lmq->lmq_get &= lmq->lmq_mask;
67 		lmq->lmq_len--;
68 		nni_msg_free(msg);
69 	}
70 }
71 
72 size_t
nni_lmq_len(nni_lmq * lmq)73 nni_lmq_len(nni_lmq *lmq)
74 {
75 	return (lmq->lmq_len);
76 }
77 
78 size_t
nni_lmq_cap(nni_lmq * lmq)79 nni_lmq_cap(nni_lmq *lmq)
80 {
81 	return (lmq->lmq_cap);
82 }
83 
84 bool
nni_lmq_full(nni_lmq * lmq)85 nni_lmq_full(nni_lmq *lmq)
86 {
87 	return (lmq->lmq_len >= lmq->lmq_cap);
88 }
89 
90 bool
nni_lmq_empty(nni_lmq * lmq)91 nni_lmq_empty(nni_lmq *lmq)
92 {
93 	return (lmq->lmq_len == 0);
94 }
95 
96 int
nni_lmq_putq(nni_lmq * lmq,nng_msg * msg)97 nni_lmq_putq(nni_lmq *lmq, nng_msg *msg)
98 {
99 	if (lmq->lmq_len >= lmq->lmq_cap) {
100 		return (NNG_EAGAIN);
101 	}
102 	lmq->lmq_msgs[lmq->lmq_put++] = msg;
103 	lmq->lmq_len++;
104 	lmq->lmq_put &= lmq->lmq_mask;
105 	return (0);
106 }
107 
108 int
nni_lmq_getq(nni_lmq * lmq,nng_msg ** msgp)109 nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp)
110 {
111 	nng_msg *msg;
112 	if (lmq->lmq_len == 0) {
113 		return (NNG_EAGAIN);
114 	}
115 	msg = lmq->lmq_msgs[lmq->lmq_get++];
116 	lmq->lmq_get &= lmq->lmq_mask;
117 	lmq->lmq_len--;
118 	*msgp = msg;
119 	return (0);
120 }
121 
122 int
nni_lmq_resize(nni_lmq * lmq,size_t cap)123 nni_lmq_resize(nni_lmq *lmq, size_t cap)
124 {
125 	nng_msg * msg;
126 	nng_msg **newq;
127 	size_t    alloc;
128 	size_t    len;
129 
130 	alloc = 2;
131 	while (alloc < cap) {
132 		alloc *= 2;
133 	}
134 
135 	newq = nni_alloc(sizeof(nng_msg *) * alloc);
136 	if (newq == NULL) {
137 		return (NNG_ENOMEM);
138 	}
139 
140 	len = 0;
141 	while ((len < cap) && (nni_lmq_getq(lmq, &msg) == 0)) {
142 		newq[len++] = msg;
143 	}
144 
145 	// Flush anything left over.
146 	nni_lmq_flush(lmq);
147 
148 	nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
149 	lmq->lmq_msgs  = newq;
150 	lmq->lmq_cap   = cap;
151 	lmq->lmq_alloc = alloc;
152 	lmq->lmq_mask  = alloc - 1;
153 	lmq->lmq_len   = len;
154 	lmq->lmq_put   = len;
155 	lmq->lmq_get   = 0;
156 
157 	return (0);
158 }
159