1 #include <u.h>
2 #include <libc.h>
3 #include <bio.h>
4 #include <thread.h>
5 #include <ixp.h>
6
7 extern char *(*_syserrstr)(void);
8 char *path;
9
10 enum {
11 DATA = 1,
12 THREAD = 2,
13 TRACE = 4,
14 READ = 8,
15 };
16 int chatty = READ;
17
18 typedef struct arg arg;
19 typedef struct arg2 arg2;
20 struct arg {
21 Rendez r;
22 IxpCFid *f;
23 Channel *ch;
24 int j, k;
25 };
26
27 struct arg2 {
28 Rendez r;
29 IxpClient *c;
30 Channel *ch;
31 int j;
32 };
33
34 Channel *chan;
35 int nproc;
36
37 void
spawn(void (* f)(void *),void * v,int s)38 spawn(void(*f)(void*), void *v, int s) {
39 nproc++;
40 proccreate(f, v, s);
41 }
42
43 void
_print(Biobuf * b,char * p,char * end,int j,int k)44 _print(Biobuf *b, char *p, char *end, int j, int k) {
45 for(; p < end; p++) {
46 Bputc(b, *p);
47 if(*p == '\n') {
48 Bflush(b);
49 Bprint(b, ":: %d %d: ", j, k);
50 }
51 }
52 }
53
54 void
readfile(IxpCFid * f,int j,int k)55 readfile(IxpCFid *f, int j, int k) {
56 Biobuf *b;
57 char *buf;
58 int n;
59
60 if(chatty&TRACE)
61 fprint(2, "readfile(%p, %d, %d) iounit: %d\n", f, j, k, f->iounit);
62
63 b = Bfdopen(dup(1, -1), OWRITE);
64 if(chatty&DATA)
65 Bprint(b, ":: %d %d: ", j, k);
66
67 buf = ixp_emalloc(f->iounit);
68 while((n = ixp_read(f, buf, f->iounit)) > 0) {
69 if(chatty&READ)
70 fprint(2, "+readfile(%p, %d, %d) n=%d\n", f, j, k, n);
71 if(chatty&DATA)
72 _print(b, buf, buf+n, j, k);
73 sleep(0);
74 }
75
76 if(chatty&TRACE)
77 fprint(2, "-readfile(%p, %d, %d) iounit: %d\n", f, j, k, f->iounit);
78 if(chatty&DATA)
79 Bputc(b, '\n');
80 Bterm(b);
81 }
82
83 static void
_read(void * p)84 _read(void *p) {
85 arg *a;
86 int k;
87
88 a = p;
89 k = a->k;
90 if(chatty&THREAD)
91 print("Start _read: %d\n", a->j, k);
92
93 qlock(a->r.l);
94 sendul(a->ch, 0);
95 rsleep(&a->r);
96 if(chatty&THREAD)
97 print("Wake _read: %d\n", a->j, k);
98 qunlock(a->r.l);
99
100 readfile(a->f, a->j, k);
101 sendul(chan, 0);
102 }
103
104 static void
_open(void * p)105 _open(void *p) {
106 arg2 *a2;
107 arg *a;
108
109 a = malloc(sizeof *a);
110
111 a2 = p;
112
113 a->j = a2->j;
114 memset(&a->r, 0, sizeof(a->r));
115 a->r.l = mallocz(sizeof(QLock), 1);
116 a->ch = chancreate(sizeof(ulong), 0);
117
118 if(chatty&THREAD)
119 print("Start _open: %d\n", a2->j);
120
121 qlock(a2->r.l);
122 sendul(a2->ch, 0);
123 rsleep(&a2->r);
124 if(chatty&THREAD)
125 print("Wake _open: %d\n", a2->j);
126 qunlock(a2->r.l);
127
128 a->f = ixp_open(a2->c, path, OREAD);
129 if(a->f == nil)
130 sysfatal("can't open %q: %r\n", path);
131 sleep(0);
132
133 for(a->k = 0; a->k < 5; a->k++) {
134 spawn(_read, a, mainstacksize);
135 recvul(a->ch);
136 }
137
138 qlock(a->r.l);
139 rwakeupall(&a->r);
140 qunlock(a->r.l);
141
142 sendul(chan, 0);
143 }
144
145 const char *_malloc_options = "A";
146
147 void
threadmain(int argc,char * argv[])148 threadmain(int argc, char *argv[]) {
149 arg2 *a;
150 char *address;
151
152 USED(argc, argv);
153 address = "unix!/tmp/ns.kris.:0/wmii";
154 address = "tcp!localhost!6663";
155 path = "/n/local/var/log/messages";
156
157 a = malloc(sizeof *a);
158 chan = chancreate(sizeof(ulong), 0);
159
160 quotefmtinstall();
161
162 _syserrstr = ixp_errbuf;
163 if(ixp_pthread_init())
164 sysfatal("can't init pthread: %r\n");
165
166 a->c = ixp_mount(address);
167 if(a->c == nil)
168 sysfatal("can't mount: %r\n");
169
170 memset(&a->r, 0, sizeof(a->r));
171 a->r.l = mallocz(sizeof(QLock), 1);
172 a->ch = chancreate(sizeof(ulong), 0);
173 for(a->j = 0; a->j < 5; a->j++) {
174 spawn(_open, a, mainstacksize);
175 recvul(a->ch);
176 }
177
178 qlock(a->r.l);
179 if(chatty&THREAD)
180 fprint(2, "qlock()\n");
181 rwakeupall(&a->r);
182 if(chatty&THREAD)
183 fprint(2, "wokeup\n");
184 qunlock(a->r.l);
185 if(chatty&THREAD)
186 fprint(2, "unlocked\n");
187
188 while(nproc--)
189 recvul(chan);
190 }
191
192