1 #include <u.h>
2 #include <libc.h>
3 #include <bio.h>
4 #include <thread.h>
5 #include <ixp.h>
6 
7 extern char *(*_syserrstr)(void);
8 char *path;
9 
10 enum {
11 	DATA = 1,
12 	THREAD = 2,
13 	TRACE = 4,
14 	READ = 8,
15 };
16 int chatty = READ;
17 
18 typedef struct arg arg;
19 typedef struct arg2 arg2;
20 struct arg {
21 	Rendez r;
22 	IxpCFid *f;
23 	Channel *ch;
24 	int j, k;
25 };
26 
27 struct arg2 {
28 	Rendez r;
29 	IxpClient *c;
30 	Channel *ch;
31 	int j;
32 };
33 
34 Channel *chan;
35 int nproc;
36 
37 void
spawn(void (* f)(void *),void * v,int s)38 spawn(void(*f)(void*), void *v, int s) {
39 	nproc++;
40 	proccreate(f, v, s);
41 }
42 
43 void
_print(Biobuf * b,char * p,char * end,int j,int k)44 _print(Biobuf *b, char *p, char *end, int j, int k) {
45 	for(; p < end; p++) {
46 		Bputc(b, *p);
47 		if(*p == '\n') {
48 			Bflush(b);
49 			Bprint(b, ":: %d %d: ", j, k);
50 		}
51 	}
52 }
53 
54 void
readfile(IxpCFid * f,int j,int k)55 readfile(IxpCFid *f, int j, int k) {
56 	Biobuf *b;
57 	char *buf;
58 	int n;
59 
60 	if(chatty&TRACE)
61 		fprint(2, "readfile(%p, %d, %d) iounit: %d\n", f, j, k, f->iounit);
62 
63 	b = Bfdopen(dup(1, -1), OWRITE);
64 	if(chatty&DATA)
65 		Bprint(b, ":: %d %d: ", j, k);
66 
67 	buf = ixp_emalloc(f->iounit);
68 	while((n = ixp_read(f, buf, f->iounit)) > 0) {
69 		if(chatty&READ)
70 			fprint(2, "+readfile(%p, %d, %d) n=%d\n", f, j, k, n);
71 		if(chatty&DATA)
72 			_print(b, buf, buf+n, j, k);
73 		sleep(0);
74 	}
75 
76 	if(chatty&TRACE)
77 		fprint(2, "-readfile(%p, %d, %d) iounit: %d\n", f, j, k, f->iounit);
78 	if(chatty&DATA)
79 		Bputc(b, '\n');
80 	Bterm(b);
81 }
82 
83 static void
_read(void * p)84 _read(void *p) {
85 	arg *a;
86 	int k;
87 
88 	a = p;
89 	k = a->k;
90 	if(chatty&THREAD)
91 		print("Start _read: %d\n", a->j, k);
92 
93 	qlock(a->r.l);
94 	sendul(a->ch, 0);
95 	rsleep(&a->r);
96 	if(chatty&THREAD)
97 		print("Wake _read: %d\n", a->j, k);
98 	qunlock(a->r.l);
99 
100 	readfile(a->f, a->j, k);
101 	sendul(chan, 0);
102 }
103 
104 static void
_open(void * p)105 _open(void *p) {
106 	arg2 *a2;
107 	arg *a;
108 
109 	a = malloc(sizeof *a);
110 
111 	a2 = p;
112 
113 	a->j = a2->j;
114 	memset(&a->r, 0, sizeof(a->r));
115 	a->r.l = mallocz(sizeof(QLock), 1);
116 	a->ch = chancreate(sizeof(ulong), 0);
117 
118 	if(chatty&THREAD)
119 		print("Start _open: %d\n", a2->j);
120 
121 	qlock(a2->r.l);
122 	sendul(a2->ch, 0);
123 	rsleep(&a2->r);
124 	if(chatty&THREAD)
125 		print("Wake _open: %d\n", a2->j);
126 	qunlock(a2->r.l);
127 
128 	a->f = ixp_open(a2->c, path, OREAD);
129 	if(a->f == nil)
130 		sysfatal("can't open %q: %r\n", path);
131 	sleep(0);
132 
133 	for(a->k = 0; a->k < 5; a->k++) {
134 		spawn(_read, a, mainstacksize);
135 		recvul(a->ch);
136 	}
137 
138 	qlock(a->r.l);
139 	rwakeupall(&a->r);
140 	qunlock(a->r.l);
141 
142 	sendul(chan, 0);
143 }
144 
145 const char *_malloc_options = "A";
146 
147 void
threadmain(int argc,char * argv[])148 threadmain(int argc, char *argv[]) {
149 	arg2 *a;
150 	char *address;
151 
152 	USED(argc, argv);
153 	address = "unix!/tmp/ns.kris.:0/wmii";
154 	address = "tcp!localhost!6663";
155 	path = "/n/local/var/log/messages";
156 
157 	a = malloc(sizeof *a);
158 	chan = chancreate(sizeof(ulong), 0);
159 
160 	quotefmtinstall();
161 
162 	_syserrstr = ixp_errbuf;
163 	if(ixp_pthread_init())
164 		sysfatal("can't init pthread: %r\n");
165 
166 	a->c = ixp_mount(address);
167 	if(a->c == nil)
168 		sysfatal("can't mount: %r\n");
169 
170 	memset(&a->r, 0, sizeof(a->r));
171 	a->r.l = mallocz(sizeof(QLock), 1);
172 	a->ch = chancreate(sizeof(ulong), 0);
173 	for(a->j = 0; a->j < 5; a->j++) {
174 		spawn(_open, a, mainstacksize);
175 		recvul(a->ch);
176 	}
177 
178 	qlock(a->r.l);
179 	if(chatty&THREAD)
180 		fprint(2, "qlock()\n");
181 	rwakeupall(&a->r);
182 	if(chatty&THREAD)
183 		fprint(2, "wokeup\n");
184 	qunlock(a->r.l);
185 	if(chatty&THREAD)
186 		fprint(2, "unlocked\n");
187 
188 	while(nproc--)
189 		recvul(chan);
190 }
191 
192