1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #ifndef _NXT_APP_QUEUE_H_INCLUDED_
7 #define _NXT_APP_QUEUE_H_INCLUDED_
8
9
10 #include <nxt_app_nncq.h>
11
12
13 /* Using Numeric Naive Circular Queue as a backend. */
14
15 #define NXT_APP_QUEUE_SIZE NXT_APP_NNCQ_SIZE
16 #define NXT_APP_QUEUE_MSG_SIZE 31
17
18 typedef struct {
19 uint8_t size;
20 uint8_t data[NXT_APP_QUEUE_MSG_SIZE];
21 uint32_t tracking;
22 } nxt_app_queue_item_t;
23
24
25 typedef struct {
26 nxt_app_nncq_atomic_t notified;
27 nxt_app_nncq_t free_items;
28 nxt_app_nncq_t queue;
29 nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE];
30 } nxt_app_queue_t;
31
32
33 nxt_inline void
nxt_app_queue_init(nxt_app_queue_t volatile * q)34 nxt_app_queue_init(nxt_app_queue_t volatile *q)
35 {
36 nxt_app_nncq_atomic_t i;
37
38 nxt_app_nncq_init(&q->free_items);
39 nxt_app_nncq_init(&q->queue);
40
41 for (i = 0; i < NXT_APP_QUEUE_SIZE; i++) {
42 nxt_app_nncq_enqueue(&q->free_items, i);
43 }
44
45 q->notified = 0;
46 }
47
48
49 nxt_inline nxt_int_t
nxt_app_queue_send(nxt_app_queue_t volatile * q,const void * p,uint8_t size,uint32_t tracking,int * notify,uint32_t * cookie)50 nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p,
51 uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie)
52 {
53 int n;
54 nxt_app_queue_item_t *qi;
55 nxt_app_nncq_atomic_t i;
56
57 i = nxt_app_nncq_dequeue(&q->free_items);
58 if (i == nxt_app_nncq_empty(&q->free_items)) {
59 return NXT_AGAIN;
60 }
61
62 qi = (nxt_app_queue_item_t *) &q->items[i];
63
64 qi->size = size;
65 nxt_memcpy(qi->data, p, size);
66 qi->tracking = tracking;
67 *cookie = i;
68
69 nxt_app_nncq_enqueue(&q->queue, i);
70
71 n = nxt_atomic_cmp_set(&q->notified, 0, 1);
72
73 if (notify != NULL) {
74 *notify = n;
75 }
76
77 return NXT_OK;
78 }
79
80
81 nxt_inline void
nxt_app_queue_notification_received(nxt_app_queue_t volatile * q)82 nxt_app_queue_notification_received(nxt_app_queue_t volatile *q)
83 {
84 q->notified = 0;
85 }
86
87
88 nxt_inline nxt_bool_t
nxt_app_queue_cancel(nxt_app_queue_t volatile * q,uint32_t cookie,uint32_t tracking)89 nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie,
90 uint32_t tracking)
91 {
92 nxt_app_queue_item_t *qi;
93
94 qi = (nxt_app_queue_item_t *) &q->items[cookie];
95
96 return nxt_atomic_cmp_set(&qi->tracking, tracking, 0);
97 }
98
99
100 nxt_inline ssize_t
nxt_app_queue_recv(nxt_app_queue_t volatile * q,void * p,uint32_t * cookie)101 nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie)
102 {
103 ssize_t res;
104 nxt_app_queue_item_t *qi;
105 nxt_app_nncq_atomic_t i;
106
107 i = nxt_app_nncq_dequeue(&q->queue);
108 if (i == nxt_app_nncq_empty(&q->queue)) {
109 *cookie = 0;
110 return -1;
111 }
112
113 qi = (nxt_app_queue_item_t *) &q->items[i];
114
115 res = qi->size;
116 nxt_memcpy(p, qi->data, qi->size);
117 *cookie = i;
118
119 nxt_app_nncq_enqueue(&q->free_items, i);
120
121 return res;
122 }
123
124
125 #endif /* _NXT_APP_QUEUE_H_INCLUDED_ */
126