1 /* Test groups of 20 processes spraying to 20 receivers */ 2 #include <stdio.h> 3 #include <string.h> 4 #include <errno.h> 5 #include <unistd.h> 6 #include <stdlib.h> 7 #include <semaphore.h> 8 #include <sys/types.h> 9 #include <sys/socket.h> 10 #include <sys/wait.h> 11 #include <sys/time.h> 12 #include <sys/poll.h> 13 #include "interbench.h" 14 15 #define DATASIZE 100 16 #define LOOPS 100 17 #define NUM_FDS 20 18 19 static inline void barf(const char *msg) 20 { 21 terminal_error(msg); 22 } 23 24 static void fdpair(int fds[2]) 25 { 26 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) 27 barf("Creating fdpair"); 28 } 29 30 /* Block until we're ready to go */ 31 static void ready(int ready_out, int wakefd) 32 { 33 char dummy; 34 struct pollfd pollfd = { .fd = wakefd, .events = POLLIN }; 35 36 /* Tell them we're ready. */ 37 if (write(ready_out, &dummy, 1) != 1) 38 barf("CLIENT: ready write"); 39 40 /* Wait for "GO" signal */ 41 if (poll(&pollfd, 1, -1) != 1) 42 barf("poll"); 43 } 44 45 /* Sender sprays LOOPS messages down each file descriptor */ 46 static void sender(int out_fd[NUM_FDS], 47 int ready_out, 48 int wakefd) 49 { 50 char data[DATASIZE]; 51 unsigned int i, j; 52 53 ready(ready_out, wakefd); 54 55 /* Now pump to every receiver. */ 56 for (i = 0; i < LOOPS; i++) { 57 for (j = 0; j < NUM_FDS; j++) { 58 int ret; 59 unsigned long done = 0; 60 61 again: 62 ret = write(out_fd[j], data + done, sizeof(data)-done); 63 if (ret < 0) 64 barf("SENDER: write"); 65 done += ret; 66 if (done < sizeof(data)) 67 goto again; 68 } 69 } 70 } 71 72 /* One receiver per fd */ 73 static void receiver(unsigned int num_packets, 74 int in_fd, 75 int ready_out, 76 int wakefd) 77 { 78 unsigned int i; 79 80 /* Wait for start... */ 81 ready(ready_out, wakefd); 82 83 /* Receive them all */ 84 for (i = 0; i < num_packets; i++) { 85 char data[DATASIZE]; 86 int ret, done = 0; 87 88 again: 89 ret = Read(in_fd, data + done, DATASIZE - done); 90 done += ret; 91 if (done < DATASIZE) 92 goto again; 93 } 94 } 95 96 /* One group of senders and receivers */ 97 static unsigned int group(int ready_out, 98 int wakefd) 99 { 100 unsigned int i; 101 int out_fds[NUM_FDS]; 102 103 for (i = 0; i < NUM_FDS; i++) { 104 int fds[2]; 105 106 /* Create the pipe between client and server */ 107 fdpair(fds); 108 109 /* Fork the receiver. */ 110 switch (fork()) { 111 case -1: barf("fork()"); 112 case 0: 113 close(fds[1]); 114 receiver(NUM_FDS*LOOPS, fds[0], ready_out, wakefd); 115 exit(0); 116 } 117 118 out_fds[i] = fds[1]; 119 close(fds[0]); 120 } 121 122 /* Now we have all the fds, fork the senders */ 123 for (i = 0; i < NUM_FDS; i++) { 124 switch (fork()) { 125 case -1: barf("fork()"); 126 case 0: 127 sender(out_fds, ready_out, wakefd); 128 exit(0); 129 } 130 } 131 132 /* Close the fds we have left */ 133 for (i = 0; i < NUM_FDS; i++) 134 close(out_fds[i]); 135 136 /* Return number of children to reap */ 137 return NUM_FDS * 2; 138 } 139 140 void *hackbench_thread(void *t) 141 { 142 unsigned int i, num_groups, total_children; 143 int readyfds[2], wakefds[2]; 144 char dummy; 145 146 num_groups = 50; 147 t = 0; 148 149 fdpair(readyfds); 150 fdpair(wakefds); 151 152 while (1) { 153 total_children = 0; 154 for (i = 0; i < num_groups; i++) 155 total_children += group(readyfds[1], wakefds[0]); 156 157 /* Wait for everyone to be ready */ 158 for (i = 0; i < total_children; i++) 159 if (Read(readyfds[0], &dummy, 1) != 1) 160 barf("Reading for readyfds"); 161 162 /* Kick them off */ 163 if (write(wakefds[1], &dummy, 1) != 1) 164 barf("Writing to start them"); 165 166 /* Reap them all */ 167 for (i = 0; i < total_children; i++) { 168 int status; 169 wait(&status); 170 if (!WIFEXITED(status)) 171 exit(1); 172 } 173 if (!trywait_sem(&hackthread.sem.stop)) 174 break; 175 } 176 177 post_sem(&hackthread.sem.complete); 178 return NULL; 179 } 180