1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 1999-2013 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Eclipse Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.eclipse.org/org/documents/epl-v10.html *
11 * (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <glenn.s.fowler@gmail.com> *
18 * *
19 ***********************************************************************/
20 #include "dttest.h"
21
22 #include <sys/mman.h>
23
24 /* Test concurrency usage of the method Dtrhset.
25 **
26 ** Written by Kiem-Phong Vo
27 */
28
29 #ifndef N_PROC
30 #define N_PROC 64
31 #endif
32
33 #define N_OBJ (N_PROC*64*1024) /* #objects to insert */
34 #define SEARCH 4 /* #searches while inserting */
35 #define PROGRESS (N_OBJ/(N_PROC*4)) /* amount done */
36
37 #define FORMAT "%09d" /* 9-digit numbers with zero-filled */
38
39 #define INSERT 001 /* to tell if an object was inserted */
40 #define DELETE 002 /* to tell if an object was deleted */
41
42 typedef struct _obj_s
43 { Dtlink_t link;
44 unsigned int flag; /* INSERT/DELETE state */
45 char str[12]; /* string representation */
46 } Obj_t;
47
48 typedef struct _proc_s
49 { Obj_t* obj; /* list of objects to add */
50 ssize_t objn; /* number of objects in list */
51 } Proc_t;
52
53 typedef struct _state_s
54 { unsigned int insert; /* insertion states */
55 unsigned int idone;
56 unsigned int delete; /* deletion states */
57 unsigned int ddone;
58 } State_t;
59
60 typedef struct _disc_s
61 { Dtdisc_t disc;
62 unsigned int lock;
63 unsigned char* addr;
64 ssize_t size;
65 } Disc_t;
66
67 static Disc_t *Disc; /* shared discipline structure */
68 static Obj_t *Obj; /* shared object list */
69 static Proc_t Proc[N_PROC]; /* process workloads */
70 static int Pnum = N_PROC+1; /* start as parent */
71
72 static int Icount; /* # insertions done */
73 static int Dcount; /* # deletions done */
74
75 static State_t *State; /* insert/delete states */
76
77 /* memory allocator for shared dictionary - no freeing here */
memory(Dt_t * dt,Void_t * addr,size_t size,Dtdisc_t * disc)78 static Void_t* memory(Dt_t* dt, Void_t* addr, size_t size, Dtdisc_t* disc)
79 {
80 int k;
81 Disc_t *dc = (Disc_t*)disc;
82
83 if(addr || size <= 0 ) /* no freeing */
84 return NIL(Void_t*);
85
86 for(k = 0;; asorelax(1<<k), k = (k+1)&07 )
87 if(asocasint(&dc->lock, 0, 1) == 0) /* get exclusive use first */
88 break;
89
90 size = ((size + sizeof(Void_t*) - 1)/sizeof(Void_t*))*sizeof(Void_t*);
91 if(size <= dc->size)
92 { addr = (Void_t*)dc->addr;
93 dc->addr += size;
94 dc->size -= size;
95 }
96 else terror("Out of shared memory");
97
98 asocasint(&dc->lock, 1, 0); /* release exclusive use */
99
100 return addr;
101 }
102
sigchild(int sig)103 static void sigchild(int sig)
104 { pid_t pid;
105 int status;
106 char *st, buf[128];
107
108 pid = wait(&status);
109 if(WIFSIGNALED(status))
110 { int sig = WTERMSIG(status);
111 sprintf(buf,"signal %s", sig == 8 ? "fpe" : sig == 11 ? "segv" : "whatever");
112 st = buf;
113 }
114 else if(WCOREDUMP(status))
115 st = "coredump";
116 else st = "normal";
117
118 tinfo("Child process %d exited (%s)", pid, st);
119 if(status)
120 terror("Child process %d exit code %d", pid, status);
121 signal(SIGCHLD, sigchild);
122 }
123
workload(Dt_t * dt,Proc_t * proc,int p)124 static void workload(Dt_t* dt, Proc_t* proc, int p)
125 {
126 Obj_t *os, *or;
127 ssize_t k, s;
128 pid_t pid = getpid();
129
130 Pnum = p+1; /* always positive */
131
132 /* insert objects in 'p' */
133 asoincint(&State->insert); /* signaling that we are ready to go */
134 while(asogetint(&State->insert) != N_PROC) /* wait until all processes are set */
135 usleep(100);
136 for(k = 0; k < proc->objn; ++k)
137 { if(k && k%PROGRESS == 0)
138 tinfo("\tProcess %d(%d): insertion passing %d", p, pid, k);
139
140 or = proc->obj+k;
141 if((os = dtinsert(dt,or)) != or)
142 tinfo("\t\tProcess %d(%d): Insert %s, get %0x", p,pid,or->str,os);
143 else os->flag |= INSERT;
144 if((os = dtsearch(dt, or)) != or)
145 tinfo("\t\tProcess %d(%d): Just inserted %s but not found", p,pid,or->str);
146 Icount += 1;
147
148 if(k > SEARCH ) /* search a few elements known to be inserted */
149 { for(s = 0; s < SEARCH; ++s)
150 { ssize_t r = random()%k;
151 or = proc->obj+r;
152 os = dtsearch(dt,or);
153 if(os != or)
154 tinfo("\t\tProcess %d(%d): Srch %s(Max %s) get %0x",
155 p,pid, or->str, proc->obj[k].str, os);
156 }
157 }
158 }
159 tinfo("Process %d(%d): insertion done", p,pid);
160 asoincint(&State->idone); /* signaling that this workload has been inserted */
161 while(asogetint(&State->idone) > 0) /* wait until parent signal ok to continue */
162 usleep(100);
163
164 /* delete objects in 'p' and also in "foe" of p */
165 asoincint(&State->delete); /* signaling that we are ready to delete */
166 while(asogetint(&State->delete) != N_PROC) /* wait until all processes are set */
167 usleep(100);
168 for(k = 0; k < proc->objn; ++k)
169 { if(k && k%PROGRESS == 0)
170 tinfo("\tProcess %d(%d): deletion passing %d", p, pid, k);
171
172 if(dtdelete(dt, proc->obj+k) == Proc[p].obj+k )
173 proc->obj[k].flag |= DELETE;
174 Dcount += 1;
175
176 if(k > SEARCH)
177 { for(s = 0; s < SEARCH; ++s)
178 { ssize_t r = random()%k;
179 or = proc->obj+r;
180 if(dtsearch(dt,or) )
181 tinfo("\t\tProcess %d(%d): Search %s !NULL",p,pid,or->str);
182 }
183 }
184 }
185 asoincint(&State->ddone); /* signaling that workload has been deleted */
186 tinfo("Process %d(%d): deletion done", p, pid);
187 }
188
tmain()189 tmain()
190 {
191 ssize_t k, z, objn;
192 Obj_t *o;
193 Dt_t *dt;
194 pid_t pid[N_PROC];
195 int zerof;
196
197 tchild();
198
199 if((zerof = open("/dev/zero", O_RDWR)) < 0)
200 terror("Can't open /dev/zero");
201
202 /* get shared memory */
203 if((k = 4*N_OBJ*sizeof(Void_t*)) < 64*1024*1024 )
204 k = 64*1024*1024;
205 z = sizeof(State_t) /* insert/delete states */ +
206 sizeof(Disc_t) /* discipline */ +
207 N_OBJ*sizeof(Obj_t) /* Obj */ +
208 k; /* table memory */
209 State = (State_t*)mmap(0,z,PROT_READ|PROT_WRITE,MAP_SHARED,zerof,0);
210 if(!State || State == (State_t*)(-1))
211 terror("mmap failed");
212 Disc = (Disc_t*)(State+1);
213 Obj = (Obj_t*)(Disc+1);
214 Disc->addr = (unsigned char*)(Obj+N_OBJ);
215 Disc->size = k;
216
217 memset(State, 0, sizeof(State_t));
218
219 /* construct the objects to be inserted */
220 for(k = 0; k < N_OBJ; ++k)
221 { Obj[k].flag = 0;
222 sprintf(Obj[k].str, FORMAT, k);
223 }
224
225 /* construct the work-load for each process */
226 objn = N_OBJ/N_PROC;
227 Proc[0].obj = &Obj[0];
228 Proc[0].objn = objn;
229 for(k = 1; k < N_PROC; ++k)
230 { Proc[k].obj = Proc[k-1].obj + Proc[k-1].objn;
231 Proc[k].objn = k < (N_PROC-1) ? objn : N_OBJ - (k*objn);
232 }
233
234 /* now create the shared dictionary */
235 Disc->disc.key = DTOFFSET(Obj_t,str);
236 Disc->disc.size = 0;
237 Disc->disc.link = 0;
238 Disc->disc.makef = NIL(Dtmake_f);
239 Disc->disc.freef = NIL(Dtfree_f);
240 Disc->disc.comparf = NIL(Dtcompar_f);
241 Disc->disc.hashf = NIL(Dthash_f);
242 Disc->disc.memoryf = memory;
243 Disc->disc.eventf = NIL(Dtevent_f);
244 if(!(dt = dtopen(&Disc->disc, Dtoset)) )
245 terror("Cannot open dictionary");
246 if(dtcustomize(dt, DT_SHARE, 1) != DT_SHARE )
247 terror("Cannot turn on share mode");
248
249 #if N_PROC == 1
250 Pnum = 1;
251 workload(dt, Proc, 0);
252 #else
253 signal(SIGCHLD, sigchild);
254 for(k = 0; k < N_PROC; ++k)
255 { if((pid[k] = fork()) < 0 )
256 terror("Can't create child process");
257 else if(pid[k] > 0) /* parent */
258 tinfo("Just launched process %d (pid=%d)", k, pid[k]);
259 else
260 { Pnum = k+1;
261 signal(SIGCHLD,SIG_IGN);
262 workload(dt, Proc+k, k);
263 texit(0);
264 }
265 }
266 #endif
267
268 tinfo("\ttsafetree: Insertion #procs=%d (free shared mem=%d)", k, Disc->size);
269 for(k = 0;; asorelax(1<<k), k = (k+1)&07)
270 if(asogetint(&State->idone) == N_PROC)
271 break;
272 tinfo("\ttsafetree: Insertion completed, checking integrity");
273
274 for(k = 0; k < N_OBJ; ++k)
275 if(!dtsearch(dt, Obj+k) )
276 terror("Failed to find object %s", Obj[k].str);
277
278 asocasint(&State->idone, N_PROC, 0);
279
280 tinfo("\ttsafetree: Deletion (free shared mem=%d)", Disc->size);
281 for(k = 0;; asorelax(1<<k), k = (k+1)&07 )
282 if(asogetint(&State->ddone) == N_PROC) /* wait until all are deleted */
283 break;
284 if(dtfirst(dt) )
285 terror("Dictionary not empty after deletion!");
286
287 z = 0;
288 for(k = 0; k < N_OBJ; ++k)
289 if((Obj[k].flag & DELETE) )
290 z += 1;
291 if(z != N_OBJ)
292 twarn("Some deletion was not properly recorded?");
293
294 tinfo("\ttsafetree: All testing done.");
295 twait(pid, -N_PROC);
296
297 texit(0);
298 }
299