1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 //
4 // This software is supplied under the terms of the MIT License, a
5 // copy of which should be located in the distribution where this
6 // file was obtained (LICENSE.txt). A copy of the license may also be
7 // found online at https://opensource.org/licenses/MIT.
8 //
9
10 #include "nng_impl.h"
11
12 // Light-weight message queue. These are derived from our heavy-weight
13 // message queues, but are less "featureful", but more useful for
14 // performance sensitive contexts. Locking must be done by the caller.
15
16 int
nni_lmq_init(nni_lmq * lmq,size_t cap)17 nni_lmq_init(nni_lmq *lmq, size_t cap)
18 {
19 size_t alloc;
20
21 // We prefer alloc to a power of 2, this allows us to do modulo
22 // operations as a power of two, for efficiency. It does possibly
23 // waste some space, but never more than 2x. Consumers should try
24 // for powers of two if they are concerned about efficiency.
25 alloc = 2;
26 while (alloc < cap) {
27 alloc *= 2;
28 }
29 if ((lmq->lmq_msgs = nni_zalloc(sizeof(nng_msg *) * alloc)) == NULL) {
30 NNI_FREE_STRUCT(lmq);
31 return (NNG_ENOMEM);
32 }
33 lmq->lmq_cap = cap;
34 lmq->lmq_alloc = alloc;
35 lmq->lmq_mask = (alloc - 1);
36 lmq->lmq_len = 0;
37 lmq->lmq_get = 0;
38 lmq->lmq_put = 0;
39
40 return (0);
41 }
42
43 void
nni_lmq_fini(nni_lmq * lmq)44 nni_lmq_fini(nni_lmq *lmq)
45 {
46 if (lmq == NULL) {
47 return;
48 }
49
50 /* Free any orphaned messages. */
51 while (lmq->lmq_len > 0) {
52 nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
53 lmq->lmq_get &= lmq->lmq_mask;
54 lmq->lmq_len--;
55 nni_msg_free(msg);
56 }
57
58 nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
59 }
60
61 void
nni_lmq_flush(nni_lmq * lmq)62 nni_lmq_flush(nni_lmq *lmq)
63 {
64 while (lmq->lmq_len > 0) {
65 nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
66 lmq->lmq_get &= lmq->lmq_mask;
67 lmq->lmq_len--;
68 nni_msg_free(msg);
69 }
70 }
71
72 size_t
nni_lmq_len(nni_lmq * lmq)73 nni_lmq_len(nni_lmq *lmq)
74 {
75 return (lmq->lmq_len);
76 }
77
78 size_t
nni_lmq_cap(nni_lmq * lmq)79 nni_lmq_cap(nni_lmq *lmq)
80 {
81 return (lmq->lmq_cap);
82 }
83
84 bool
nni_lmq_full(nni_lmq * lmq)85 nni_lmq_full(nni_lmq *lmq)
86 {
87 return (lmq->lmq_len >= lmq->lmq_cap);
88 }
89
90 bool
nni_lmq_empty(nni_lmq * lmq)91 nni_lmq_empty(nni_lmq *lmq)
92 {
93 return (lmq->lmq_len == 0);
94 }
95
96 int
nni_lmq_putq(nni_lmq * lmq,nng_msg * msg)97 nni_lmq_putq(nni_lmq *lmq, nng_msg *msg)
98 {
99 if (lmq->lmq_len >= lmq->lmq_cap) {
100 return (NNG_EAGAIN);
101 }
102 lmq->lmq_msgs[lmq->lmq_put++] = msg;
103 lmq->lmq_len++;
104 lmq->lmq_put &= lmq->lmq_mask;
105 return (0);
106 }
107
108 int
nni_lmq_getq(nni_lmq * lmq,nng_msg ** msgp)109 nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp)
110 {
111 nng_msg *msg;
112 if (lmq->lmq_len == 0) {
113 return (NNG_EAGAIN);
114 }
115 msg = lmq->lmq_msgs[lmq->lmq_get++];
116 lmq->lmq_get &= lmq->lmq_mask;
117 lmq->lmq_len--;
118 *msgp = msg;
119 return (0);
120 }
121
122 int
nni_lmq_resize(nni_lmq * lmq,size_t cap)123 nni_lmq_resize(nni_lmq *lmq, size_t cap)
124 {
125 nng_msg * msg;
126 nng_msg **newq;
127 size_t alloc;
128 size_t len;
129
130 alloc = 2;
131 while (alloc < cap) {
132 alloc *= 2;
133 }
134
135 newq = nni_alloc(sizeof(nng_msg *) * alloc);
136 if (newq == NULL) {
137 return (NNG_ENOMEM);
138 }
139
140 len = 0;
141 while ((len < cap) && (nni_lmq_getq(lmq, &msg) == 0)) {
142 newq[len++] = msg;
143 }
144
145 // Flush anything left over.
146 nni_lmq_flush(lmq);
147
148 nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
149 lmq->lmq_msgs = newq;
150 lmq->lmq_cap = cap;
151 lmq->lmq_alloc = alloc;
152 lmq->lmq_mask = alloc - 1;
153 lmq->lmq_len = len;
154 lmq->lmq_put = len;
155 lmq->lmq_get = 0;
156
157 return (0);
158 }
159