1 #include "stdinc.h"
2 #include "dat.h"
3 #include "fns.h"
4
5 typedef struct LumpQueue LumpQueue;
6 typedef struct WLump WLump;
7
8 enum
9 {
10 MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */
11 };
12
13 struct WLump
14 {
15 Lump *u;
16 Packet *p;
17 int creator;
18 int gen;
19 uint ms;
20 };
21
22 struct LumpQueue
23 {
24 QLock lock;
25 Rendez flush;
26 Rendez full;
27 Rendez empty;
28 WLump q[MaxLumpQ];
29 int w;
30 int r;
31 };
32
33 static LumpQueue *lumpqs;
34 static int nqs;
35
36 static QLock glk;
37 static int gen;
38
39 static void queueproc(void *vq);
40
41 int
initlumpqueues(int nq)42 initlumpqueues(int nq)
43 {
44 LumpQueue *q;
45
46 int i;
47 nqs = nq;
48
49 lumpqs = MKNZ(LumpQueue, nq);
50
51 for(i = 0; i < nq; i++){
52 q = &lumpqs[i];
53 q->full.l = &q->lock;
54 q->empty.l = &q->lock;
55 q->flush.l = &q->lock;
56
57 if(vtproc(queueproc, q) < 0){
58 seterr(EOk, "can't start write queue slave: %r");
59 return -1;
60 }
61 }
62
63 return 0;
64 }
65
66 /*
67 * queue a lump & it's packet data for writing
68 */
69 int
queuewrite(Lump * u,Packet * p,int creator,uint ms)70 queuewrite(Lump *u, Packet *p, int creator, uint ms)
71 {
72 LumpQueue *q;
73 int i;
74
75 trace(TraceProc, "queuewrite");
76 i = indexsect(mainindex, u->score);
77 if(i < 0 || i >= nqs){
78 seterr(EBug, "internal error: illegal index section in queuewrite");
79 return -1;
80 }
81
82 q = &lumpqs[i];
83
84 qlock(&q->lock);
85 while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
86 trace(TraceProc, "queuewrite sleep");
87 rsleep(&q->full);
88 }
89
90 q->q[q->w].u = u;
91 q->q[q->w].p = p;
92 q->q[q->w].creator = creator;
93 q->q[q->w].ms = ms;
94 q->q[q->w].gen = gen;
95 q->w = (q->w + 1) & (MaxLumpQ - 1);
96
97 trace(TraceProc, "queuewrite wakeup");
98 rwakeup(&q->empty);
99
100 qunlock(&q->lock);
101
102 return 0;
103 }
104
105 void
flushqueue(void)106 flushqueue(void)
107 {
108 int i;
109 LumpQueue *q;
110
111 if(!lumpqs)
112 return;
113
114 trace(TraceProc, "flushqueue");
115
116 qlock(&glk);
117 gen++;
118 qunlock(&glk);
119
120 for(i=0; i<mainindex->nsects; i++){
121 q = &lumpqs[i];
122 qlock(&q->lock);
123 while(q->w != q->r && gen - q->q[q->r].gen > 0){
124 trace(TraceProc, "flushqueue sleep q%d", i);
125 rsleep(&q->flush);
126 }
127 qunlock(&q->lock);
128 }
129 }
130
131 static void
queueproc(void * vq)132 queueproc(void *vq)
133 {
134 LumpQueue *q;
135 Lump *u;
136 Packet *p;
137 int creator;
138 uint ms;
139
140 threadsetname("queueproc");
141
142 q = vq;
143 for(;;){
144 qlock(&q->lock);
145 while(q->w == q->r){
146 trace(TraceProc, "queueproc sleep empty");
147 rsleep(&q->empty);
148 }
149
150 u = q->q[q->r].u;
151 p = q->q[q->r].p;
152 creator = q->q[q->r].creator;
153 ms = q->q[q->r].ms;
154
155 q->r = (q->r + 1) & (MaxLumpQ - 1);
156 trace(TraceProc, "queueproc wakeup flush");
157 rwakeupall(&q->flush);
158
159 trace(TraceProc, "queueproc wakeup full");
160 rwakeup(&q->full);
161
162 qunlock(&q->lock);
163
164 trace(TraceProc, "queueproc writelump %V", u->score);
165 if(writeqlump(u, p, creator, ms) < 0)
166 fprint(2, "failed to write lump for %V: %r", u->score);
167 trace(TraceProc, "queueproc wrotelump %V", u->score);
168
169 putlump(u);
170 }
171 }
172