1 #include "stdinc.h"
2 #include "dat.h"
3 #include "fns.h"
4 
5 typedef struct LumpQueue	LumpQueue;
6 typedef struct WLump		WLump;
7 
8 enum
9 {
10 	MaxLumpQ	= 1 << 3	/* max. lumps on a single write queue, must be pow 2 */
11 };
12 
13 struct WLump
14 {
15 	Lump	*u;
16 	Packet	*p;
17 	int	creator;
18 	int	gen;
19 	uint	ms;
20 };
21 
22 struct LumpQueue
23 {
24 	QLock	lock;
25 	Rendez 	flush;
26 	Rendez	full;
27 	Rendez	empty;
28 	WLump	q[MaxLumpQ];
29 	int	w;
30 	int	r;
31 };
32 
33 static LumpQueue	*lumpqs;
34 static int		nqs;
35 
36 static QLock		glk;
37 static int		gen;
38 
39 static void	queueproc(void *vq);
40 
41 int
initlumpqueues(int nq)42 initlumpqueues(int nq)
43 {
44 	LumpQueue *q;
45 
46 	int i;
47 	nqs = nq;
48 
49 	lumpqs = MKNZ(LumpQueue, nq);
50 
51 	for(i = 0; i < nq; i++){
52 		q = &lumpqs[i];
53 		q->full.l = &q->lock;
54 		q->empty.l = &q->lock;
55 		q->flush.l = &q->lock;
56 
57 		if(vtproc(queueproc, q) < 0){
58 			seterr(EOk, "can't start write queue slave: %r");
59 			return -1;
60 		}
61 	}
62 
63 	return 0;
64 }
65 
66 /*
67  * queue a lump & it's packet data for writing
68  */
69 int
queuewrite(Lump * u,Packet * p,int creator,uint ms)70 queuewrite(Lump *u, Packet *p, int creator, uint ms)
71 {
72 	LumpQueue *q;
73 	int i;
74 
75 	trace(TraceProc, "queuewrite");
76 	i = indexsect(mainindex, u->score);
77 	if(i < 0 || i >= nqs){
78 		seterr(EBug, "internal error: illegal index section in queuewrite");
79 		return -1;
80 	}
81 
82 	q = &lumpqs[i];
83 
84 	qlock(&q->lock);
85 	while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
86 		trace(TraceProc, "queuewrite sleep");
87 		rsleep(&q->full);
88 	}
89 
90 	q->q[q->w].u = u;
91 	q->q[q->w].p = p;
92 	q->q[q->w].creator = creator;
93 	q->q[q->w].ms = ms;
94 	q->q[q->w].gen = gen;
95 	q->w = (q->w + 1) & (MaxLumpQ - 1);
96 
97 	trace(TraceProc, "queuewrite wakeup");
98 	rwakeup(&q->empty);
99 
100 	qunlock(&q->lock);
101 
102 	return 0;
103 }
104 
105 void
flushqueue(void)106 flushqueue(void)
107 {
108 	int i;
109 	LumpQueue *q;
110 
111 	if(!lumpqs)
112 		return;
113 
114 	trace(TraceProc, "flushqueue");
115 
116 	qlock(&glk);
117 	gen++;
118 	qunlock(&glk);
119 
120 	for(i=0; i<mainindex->nsects; i++){
121 		q = &lumpqs[i];
122 		qlock(&q->lock);
123 		while(q->w != q->r && gen - q->q[q->r].gen > 0){
124 			trace(TraceProc, "flushqueue sleep q%d", i);
125 			rsleep(&q->flush);
126 		}
127 		qunlock(&q->lock);
128 	}
129 }
130 
131 static void
queueproc(void * vq)132 queueproc(void *vq)
133 {
134 	LumpQueue *q;
135 	Lump *u;
136 	Packet *p;
137 	int creator;
138 	uint ms;
139 
140 	threadsetname("queueproc");
141 
142 	q = vq;
143 	for(;;){
144 		qlock(&q->lock);
145 		while(q->w == q->r){
146 			trace(TraceProc, "queueproc sleep empty");
147 			rsleep(&q->empty);
148 		}
149 
150 		u = q->q[q->r].u;
151 		p = q->q[q->r].p;
152 		creator = q->q[q->r].creator;
153 		ms = q->q[q->r].ms;
154 
155 		q->r = (q->r + 1) & (MaxLumpQ - 1);
156 		trace(TraceProc, "queueproc wakeup flush");
157 		rwakeupall(&q->flush);
158 
159 		trace(TraceProc, "queueproc wakeup full");
160 		rwakeup(&q->full);
161 
162 		qunlock(&q->lock);
163 
164 		trace(TraceProc, "queueproc writelump %V", u->score);
165 		if(writeqlump(u, p, creator, ms) < 0)
166 			fprint(2, "failed to write lump for %V: %r", u->score);
167 		trace(TraceProc, "queueproc wrotelump %V", u->score);
168 
169 		putlump(u);
170 	}
171 }
172