1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #ifndef _NXT_APP_QUEUE_H_INCLUDED_
7 #define _NXT_APP_QUEUE_H_INCLUDED_
8 
9 
10 #include <nxt_app_nncq.h>
11 
12 
13 /* Using Numeric Naive Circular Queue as a backend. */
14 
15 #define NXT_APP_QUEUE_SIZE      NXT_APP_NNCQ_SIZE
16 #define NXT_APP_QUEUE_MSG_SIZE  31
17 
18 typedef struct {
19     uint8_t   size;
20     uint8_t   data[NXT_APP_QUEUE_MSG_SIZE];
21     uint32_t  tracking;
22 } nxt_app_queue_item_t;
23 
24 
25 typedef struct {
26     nxt_app_nncq_atomic_t  notified;
27     nxt_app_nncq_t         free_items;
28     nxt_app_nncq_t         queue;
29     nxt_app_queue_item_t   items[NXT_APP_QUEUE_SIZE];
30 } nxt_app_queue_t;
31 
32 
33 nxt_inline void
nxt_app_queue_init(nxt_app_queue_t volatile * q)34 nxt_app_queue_init(nxt_app_queue_t volatile *q)
35 {
36     nxt_app_nncq_atomic_t  i;
37 
38     nxt_app_nncq_init(&q->free_items);
39     nxt_app_nncq_init(&q->queue);
40 
41     for (i = 0; i < NXT_APP_QUEUE_SIZE; i++) {
42         nxt_app_nncq_enqueue(&q->free_items, i);
43     }
44 
45     q->notified = 0;
46 }
47 
48 
49 nxt_inline nxt_int_t
nxt_app_queue_send(nxt_app_queue_t volatile * q,const void * p,uint8_t size,uint32_t tracking,int * notify,uint32_t * cookie)50 nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p,
51     uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie)
52 {
53     int                    n;
54     nxt_app_queue_item_t   *qi;
55     nxt_app_nncq_atomic_t  i;
56 
57     i = nxt_app_nncq_dequeue(&q->free_items);
58     if (i == nxt_app_nncq_empty(&q->free_items)) {
59         return NXT_AGAIN;
60     }
61 
62     qi = (nxt_app_queue_item_t *) &q->items[i];
63 
64     qi->size = size;
65     nxt_memcpy(qi->data, p, size);
66     qi->tracking = tracking;
67     *cookie = i;
68 
69     nxt_app_nncq_enqueue(&q->queue, i);
70 
71     n = nxt_atomic_cmp_set(&q->notified, 0, 1);
72 
73     if (notify != NULL) {
74         *notify = n;
75     }
76 
77     return NXT_OK;
78 }
79 
80 
81 nxt_inline void
nxt_app_queue_notification_received(nxt_app_queue_t volatile * q)82 nxt_app_queue_notification_received(nxt_app_queue_t volatile *q)
83 {
84     q->notified = 0;
85 }
86 
87 
88 nxt_inline nxt_bool_t
nxt_app_queue_cancel(nxt_app_queue_t volatile * q,uint32_t cookie,uint32_t tracking)89 nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie,
90     uint32_t tracking)
91 {
92     nxt_app_queue_item_t  *qi;
93 
94     qi = (nxt_app_queue_item_t *) &q->items[cookie];
95 
96     return nxt_atomic_cmp_set(&qi->tracking, tracking, 0);
97 }
98 
99 
100 nxt_inline ssize_t
nxt_app_queue_recv(nxt_app_queue_t volatile * q,void * p,uint32_t * cookie)101 nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie)
102 {
103     ssize_t                res;
104     nxt_app_queue_item_t   *qi;
105     nxt_app_nncq_atomic_t  i;
106 
107     i = nxt_app_nncq_dequeue(&q->queue);
108     if (i == nxt_app_nncq_empty(&q->queue)) {
109         *cookie = 0;
110         return -1;
111     }
112 
113     qi = (nxt_app_queue_item_t *) &q->items[i];
114 
115     res = qi->size;
116     nxt_memcpy(p, qi->data, qi->size);
117     *cookie = i;
118 
119     nxt_app_nncq_enqueue(&q->free_items, i);
120 
121     return res;
122 }
123 
124 
125 #endif /* _NXT_APP_QUEUE_H_INCLUDED_ */
126