1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 1999-2013 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *               Glenn Fowler <glenn.s.fowler@gmail.com>                *
18 *                                                                      *
19 ***********************************************************************/
20 #include	"dttest.h"
21 
22 #include	<sys/mman.h>
23 
24 /* Test concurrency usage of the method Dtrhset.
25 **
26 ** Written by Kiem-Phong Vo
27 */
28 
29 #ifndef N_PROC
30 #define N_PROC		64
31 #endif
32 
33 #define	N_OBJ		(N_PROC*64*1024) /* #objects to insert	*/
34 #define SEARCH		4	/* #searches while inserting	*/
35 #define PROGRESS	(N_OBJ/(N_PROC*4)) /* amount done	*/
36 
37 #define FORMAT	"%09d"	/* 9-digit numbers with zero-filled	*/
38 
39 #define INSERT	001	/* to tell if an object was inserted	*/
40 #define DELETE	002	/* to tell if an object was deleted	*/
41 
42 typedef struct _obj_s
43 {	Dtlink_t	link;
44 	unsigned int	flag;	/* INSERT/DELETE state		*/
45 	char		str[12]; /* string representation	*/
46 } Obj_t;
47 
48 typedef struct _proc_s
49 {	Obj_t*		obj;	/* list of objects to add	*/
50 	ssize_t		objn;	/* number of objects in list	*/
51 } Proc_t;
52 
53 typedef struct _state_s
54 {	unsigned int	insert;	/* insertion states		*/
55 	unsigned int	idone;
56 	unsigned int	delete;	/* deletion states		*/
57 	unsigned int	ddone;
58 } State_t;
59 
60 typedef struct _disc_s
61 {	Dtdisc_t	disc;
62 	unsigned int	lock;
63 	unsigned char*	addr;
64 	ssize_t		size;
65 } Disc_t;
66 
67 static Disc_t		*Disc;	/* shared discipline structure 	*/
68 static Obj_t		*Obj;	/* shared object list		*/
69 static Proc_t		Proc[N_PROC]; /* process workloads	*/
70 static int		Pnum = N_PROC+1; /* start as parent	*/
71 
72 static int		Icount;	/* # insertions done		*/
73 static int		Dcount;	/* # deletions done		*/
74 
75 static State_t		*State;	/* insert/delete states		*/
76 
77 /* memory allocator for shared dictionary - no freeing here */
memory(Dt_t * dt,Void_t * addr,size_t size,Dtdisc_t * disc)78 static Void_t* memory(Dt_t* dt, Void_t* addr, size_t size, Dtdisc_t* disc)
79 {
80 	int	k;
81 	Disc_t	*dc = (Disc_t*)disc;
82 
83 	if(addr || size <= 0 ) /* no freeing */
84 		return NIL(Void_t*);
85 
86 	for(k = 0;; asorelax(1<<k), k = (k+1)&07 )
87 		if(asocasint(&dc->lock, 0, 1) == 0) /* get exclusive use first */
88 			break;
89 
90 	size = ((size + sizeof(Void_t*) - 1)/sizeof(Void_t*))*sizeof(Void_t*);
91 	if(size <= dc->size)
92 	{	addr = (Void_t*)dc->addr;
93 		dc->addr += size;
94 		dc->size -= size;
95 	}
96 	else	terror("Out of shared memory");
97 
98 	asocasint(&dc->lock, 1, 0); /* release exclusive use */
99 
100 	return addr;
101 }
102 
sigchild(int sig)103 static void sigchild(int sig)
104 {	pid_t	pid;
105 	int	status;
106 	char	*st, buf[128];
107 
108 	pid = wait(&status);
109 	if(WIFSIGNALED(status))
110 	{	int	sig = WTERMSIG(status);
111 		sprintf(buf,"signal %s", sig == 8 ? "fpe" : sig == 11 ? "segv" : "whatever");
112 		st = buf;
113 	}
114 	else if(WCOREDUMP(status))
115 		st = "coredump";
116 	else	st = "normal";
117 
118 	tinfo("Child process %d exited (%s)", pid, st);
119 	if(status)
120 		terror("Child process %d exit code %d", pid, status);
121 	signal(SIGCHLD, sigchild);
122 }
123 
workload(Dt_t * dt,Proc_t * proc,int p)124 static void workload(Dt_t* dt, Proc_t* proc, int p)
125 {
126 	Obj_t	*os, *or;
127 	ssize_t	k, s;
128 	pid_t	pid = getpid();
129 
130 	Pnum = p+1; /* always positive */
131 
132 	/* insert objects in 'p' */
133 	asoincint(&State->insert); /* signaling that we are ready to go */
134 	while(asogetint(&State->insert) != N_PROC) /* wait until all processes are set */
135 		usleep(100);
136 	for(k = 0; k < proc->objn; ++k)
137 	{	if(k && k%PROGRESS == 0)
138 			tinfo("\tProcess %d(%d): insertion passing %d", p, pid, k);
139 
140 		or = proc->obj+k;
141 		if((os = dtinsert(dt,or)) != or)
142 			tinfo("\t\tProcess %d(%d): Insert %s, get %0x", p,pid,or->str,os);
143 		else	os->flag |= INSERT;
144 		if((os = dtsearch(dt, or)) != or)
145 			tinfo("\t\tProcess %d(%d): Just inserted %s but not found", p,pid,or->str);
146 		Icount += 1;
147 
148 		if(k > SEARCH ) /* search a few elements known to be inserted */
149 		{	for(s = 0; s < SEARCH; ++s)
150 			{	ssize_t r = random()%k;
151 				or = proc->obj+r;
152 				os = dtsearch(dt,or);
153 				if(os != or)
154 					tinfo("\t\tProcess %d(%d): Srch %s(Max %s) get %0x",
155 						p,pid, or->str, proc->obj[k].str, os);
156 			}
157 		}
158 	}
159 	tinfo("Process %d(%d): insertion done", p,pid);
160 	asoincint(&State->idone); /* signaling that this workload has been inserted */
161 	while(asogetint(&State->idone) > 0) /* wait until parent signal ok to continue */
162 		usleep(100);
163 
164 	/* delete objects in 'p' and also in "foe" of p */
165 	asoincint(&State->delete); /* signaling that we are ready to delete */
166 	while(asogetint(&State->delete) != N_PROC) /* wait until all processes are set */
167 		usleep(100);
168 	for(k = 0; k < proc->objn; ++k)
169 	{	if(k && k%PROGRESS == 0)
170 			tinfo("\tProcess %d(%d): deletion passing %d", p, pid, k);
171 
172 		if(dtdelete(dt, proc->obj+k) == Proc[p].obj+k )
173 			proc->obj[k].flag |= DELETE;
174 		Dcount += 1;
175 
176 		if(k > SEARCH)
177 		{	for(s = 0; s < SEARCH; ++s)
178 			{	ssize_t r = random()%k;
179 				or = proc->obj+r;
180 				if(dtsearch(dt,or) )
181 					tinfo("\t\tProcess %d(%d): Search %s !NULL",p,pid,or->str);
182 			}
183 		}
184 	}
185 	asoincint(&State->ddone); /* signaling that workload has been deleted */
186 	tinfo("Process %d(%d): deletion done", p, pid);
187 }
188 
tmain()189 tmain()
190 {
191 	ssize_t		k, z, objn;
192 	Obj_t		*o;
193 	Dt_t		*dt;
194 	pid_t		pid[N_PROC];
195 	int		zerof;
196 
197 	tchild();
198 
199 	if((zerof = open("/dev/zero", O_RDWR)) < 0)
200 		terror("Can't open /dev/zero");
201 
202 	/* get shared memory */
203 	if((k = 4*N_OBJ*sizeof(Void_t*)) <  64*1024*1024 )
204 		k = 64*1024*1024;
205 	z = sizeof(State_t) /* insert/delete states */ +
206 	    sizeof(Disc_t) /* discipline */ +
207 	    N_OBJ*sizeof(Obj_t) /*  Obj  */ +
208 	    k; /* table memory */
209 	State = (State_t*)mmap(0,z,PROT_READ|PROT_WRITE,MAP_SHARED,zerof,0);
210 	if(!State || State == (State_t*)(-1))
211 		terror("mmap failed");
212 	Disc = (Disc_t*)(State+1);
213 	Obj  = (Obj_t*)(Disc+1);
214 	Disc->addr = (unsigned char*)(Obj+N_OBJ);
215 	Disc->size = k;
216 
217 	memset(State, 0, sizeof(State_t));
218 
219 	/* construct the objects to be inserted */
220 	for(k = 0; k < N_OBJ; ++k)
221 	{	Obj[k].flag = 0;
222 		sprintf(Obj[k].str, FORMAT, k);
223 	}
224 
225 	/* construct the work-load for each process */
226 	objn = N_OBJ/N_PROC;
227 	Proc[0].obj  = &Obj[0];
228 	Proc[0].objn = objn;
229 	for(k = 1; k < N_PROC; ++k)
230 	{	Proc[k].obj  = Proc[k-1].obj + Proc[k-1].objn;
231 		Proc[k].objn = k < (N_PROC-1) ? objn : N_OBJ - (k*objn);
232 	}
233 
234 	/* now create the shared dictionary */
235 	Disc->disc.key     = DTOFFSET(Obj_t,str);
236 	Disc->disc.size    = 0;
237 	Disc->disc.link    = 0;
238 	Disc->disc.makef   = NIL(Dtmake_f);
239 	Disc->disc.freef   = NIL(Dtfree_f);
240 	Disc->disc.comparf = NIL(Dtcompar_f);
241 	Disc->disc.hashf   = NIL(Dthash_f);
242 	Disc->disc.memoryf = memory;
243 	Disc->disc.eventf  = NIL(Dtevent_f);
244 	if(!(dt = dtopen(&Disc->disc, Dtoset)) )
245 		terror("Cannot open dictionary");
246 	if(dtcustomize(dt, DT_SHARE, 1) != DT_SHARE )
247 		terror("Cannot turn on share mode");
248 
249 #if N_PROC == 1
250 	Pnum = 1;
251 	workload(dt, Proc, 0);
252 #else
253 	signal(SIGCHLD, sigchild);
254 	for(k = 0; k < N_PROC; ++k)
255 	{	if((pid[k] = fork()) < 0 )
256 			terror("Can't create child process");
257 		else if(pid[k] > 0) /* parent */
258 			tinfo("Just launched process %d (pid=%d)", k, pid[k]);
259 		else
260 		{	Pnum = k+1;
261 			signal(SIGCHLD,SIG_IGN);
262 			workload(dt, Proc+k, k);
263 			texit(0);
264 		}
265 	}
266 #endif
267 
268 	tinfo("\ttsafetree: Insertion #procs=%d (free shared mem=%d)", k, Disc->size);
269 	for(k = 0;; asorelax(1<<k), k = (k+1)&07)
270 		if(asogetint(&State->idone) == N_PROC)
271 			break;
272 	tinfo("\ttsafetree: Insertion completed, checking integrity");
273 
274 	for(k = 0; k < N_OBJ; ++k)
275 		if(!dtsearch(dt, Obj+k) )
276 			terror("Failed to find object %s", Obj[k].str);
277 
278 	asocasint(&State->idone, N_PROC, 0);
279 
280 	tinfo("\ttsafetree: Deletion (free shared mem=%d)", Disc->size);
281 	for(k = 0;; asorelax(1<<k), k = (k+1)&07 )
282 		if(asogetint(&State->ddone) == N_PROC) /* wait until all are deleted */
283 			break;
284 	if(dtfirst(dt) )
285 		terror("Dictionary not empty after deletion!");
286 
287 	z = 0;
288 	for(k = 0; k < N_OBJ; ++k)
289 		if((Obj[k].flag & DELETE) )
290 			z += 1;
291 	if(z != N_OBJ)
292 		twarn("Some deletion was not properly recorded?");
293 
294 	tinfo("\ttsafetree: All testing done.");
295 	twait(pid, -N_PROC);
296 
297 	texit(0);
298 }
299