1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <math.h>
4 #include <string.h>
5
6 #include "config.h"
7 #include "ev_dfg.h"
8 #include "test_support.h"
9
10 CManager cm;
11
12 static int status;
13 static EVdfg dfg;
14 static int base=2;
15 static int nbase = 2;
16 static int level_count = 3;
17
18 static EVdfg_stone *tmp;
19 static EVdfg_stone *last;
20
21 const int reconfig_node_count = 3;
22 const int num_terminals = 2;
23
24 char *str_contact;
25 char **reconfig_list = NULL;
26
27 static
28 int
simple_handler(CManager cm,void * vevent,void * client_data,attr_list attrs)29 simple_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
30 {
31 simple_rec_ptr event = vevent;
32 (void)cm;
33 (void)client_data;
34 static int count;
35 checksum_simple_record(event, attrs, quiet);
36 if (++count == 300) {
37 EVclient_shutdown(dfg, 0);
38 }
39 printf("\nStatic configuration working, ready for reconfiguration! All the best!\n");
40 fflush(stdout);
41 return 0;
42 }
43
44
45 /* ****** Algorithm ******
46
47 - The initial tree will consist of two nodes, root and one child
48
49 - First child freezes the dataflow and deletes the link between the root and it's only child
50 - First child then forks n number of children which will now attempt to join the dfg
51 - First child does not call realize, dataflow is frozen
52
53 - Every node joining in creates stones and links them appropriately
54 - According to the number of the node joining in, it will either be an intermediate node or a leaf node
55 - Leaf nodes will have terminal actions associated with them and intermidate nodes will have split actions (as of now)
56 - When appropriate number of nodes have joined in, the graph is realized
57 - The above three points can be implemented by basically unrolling the loops in the static tree_test code
58
59 - Timing measurements can be made by starting the timer in the node_register_handler only when the 2nd node for reconfiguration joins in
60 - The timer will be stopped when the graph has been realized, mostly inside node_register_handler
61
62 ****** ********* ****** */
63
64 static void
join_handler(EVdfg dfg,char * identifier,void * available_sources,void * available_sinks)65 join_handler(EVdfg dfg, char *identifier, void* available_sources, void *available_sinks)
66 {
67
68 static int joined_node_count;
69 static int i = 2;
70 static int j;
71 static int k;
72
73 char *thandle;
74 char *canon_name = malloc(20 * sizeof(char));
75
76 if (strcmp(identifier, "origin") == 0) {
77 EVdfg_stone terminal = NULL;
78
79 canon_name = strdup("origin");
80 EVmaster_assign_canonical_name(dfg, identifier, canon_name);
81
82 EVclient_register_sink_handler(cm, "thandler", simple_format_list, (EVSimpleHandlerFunc) simple_handler);
83 terminal = EVdfg_create_sink_stone(dfg, "thandler");
84
85 EVdfg_link_port(last[0], 0, terminal);
86 EVdfg_assign_node(last[0], "origin");
87 EVdfg_assign_node(terminal, "origin");
88
89 ++joined_node_count;
90 EVdfg_realize(dfg);
91 } else {
92 if (strcmp(identifier, "forker") == 0) {
93 // EVdfg_reconfig_delete_link(dfg, 0, 1);
94
95 // canon_name = malloc(20 * sizeof(char));
96 ++joined_node_count;
97
98 // delayed_fork_children(cm, &reconfig_list[0], str_contact, 5);
99 } else {
100 if (i < level_count) {
101 // if (j < nbase) {
102 tmp[j] = EVdfg_create_stone(dfg, NULL);
103
104 sprintf(canon_name, "client%d", joined_node_count);
105 EVmaster_assign_canonical_name(dfg, identifier, canon_name);
106 EVdfg_assign_node(tmp[j], canon_name);
107
108 EVdfg_reconfig_link_port(last[j / base], j % base, tmp[j], NULL);
109
110 ++joined_node_count;
111 ++j;
112 // }
113 if (j >= nbase) {
114 for (j = 0; j < nbase; ++j) {
115 last[j] = tmp[j];
116 }
117 nbase *= base;
118 j = 0;
119 ++i;
120 }
121 } else {
122 thandle = strdup("thandler");
123 // sprintf(thandle, "thandler%d", joined_node_count);
124 EVclient_register_sink_handler(cm, thandle, simple_format_list, (EVSimpleHandlerFunc) simple_handler);
125 tmp[k] = EVdfg_create_sink_stone(dfg, thandle);
126
127 sprintf(canon_name, "terminal_node%d", joined_node_count);
128 EVmaster_assign_canonical_name(dfg, identifier, canon_name);
129 EVdfg_assign_node(tmp[k], canon_name);
130
131 EVdfg_reconfig_link_port(last[k / base], k % base, tmp[k], NULL);
132
133 ++k;
134 ++joined_node_count;
135 if (k >= nbase) {
136 EVdfg_realize(dfg);
137 }
138 }
139 }
140 }
141 }
142
143
144 extern int
be_test_master(int argc,char ** argv)145 be_test_master(int argc, char **argv)
146 {
147 attr_list contact_list;
148 EVsource source_handle;
149 int last_row_size;
150 int i;
151 char **nodes;
152
153 nodes = malloc(2 * sizeof(nodes[0]));
154
155 nodes[0] = strdup("origin");
156 nodes[1] = NULL;
157
158 last_row_size = (int) pow(base, level_count - 1);
159
160 tmp = malloc(last_row_size * sizeof(EVdfg_stone));
161 last = malloc(last_row_size * sizeof(EVdfg_stone));
162
163 cm = CManager_create();
164 CMlisten(cm);
165 contact_list = CMget_contact_list(cm);
166 str_contact = attr_list_to_string(contact_list);
167
168 printf("\nMaster's contact = %s\n", str_contact);
169 fflush(stdout);
170
171 source_handle = EVcreate_submit_handle(cm, DFG_SOURCE ,simple_format_list);
172 source_capabilities = EVclient_register_source("master_source", source_handle);
173
174 dfg = EVdfg_create(cm);
175 EVmaster_node_join_handler(dfg, join_handler);;
176
177 last[0] = EVdfg_create_source_stone(dfg, "master_source");
178
179 /* pprabhu: creating list of reconfiguration node names */
180 reconfig_list = malloc(sizeof(reconfig_list[0]) * (reconfig_node_count + 2));
181 for (i=0; i < (reconfig_node_count + 1); i++) {
182 reconfig_list[i] = strdup("client");
183 }
184 reconfig_list[reconfig_node_count + 1] = NULL;
185
186 EVdfg_join_dfg(dfg, "origin", str_contact);
187
188 if (EVclient_ready_wait(dfg) != 1) {
189 /* dfg initialization failed! */
190 exit(1);
191 }
192
193
194 if (EVclient_active_sink_count(dfg) == 0) {
195 EVclient_ready_for_shutdown(dfg);
196 }
197
198 if (EVclient_source_active(source_handle)) {
199 simple_rec rec;
200 for (i = 0; i < 300; ++i) {
201 CMsleep(cm, 1);
202 generate_simple_record(&rec);
203 /* submit would be quietly ignored if source is not active */
204 EVsubmit(source_handle, &rec, NULL);
205 }
206 }
207
208 status = EVclient_wait_for_shutdown(dfg);
209
210 wait_for_children(nodes);
211
212 CManager_close(cm);
213 return status;
214 }
215
216
217 extern int
be_test_child(int argc,char ** argv)218 be_test_child(int argc, char **argv)
219 {
220 CManager cm;
221 EVsource src;
222 char *chandle;
223
224 int i;
225
226 cm = CManager_create();
227
228 if (argc != 3) {
229 printf("Child usage: evtest <nodename> <mastercontact>\n");
230 exit(1);
231 }
232
233 dfg = EVdfg_create(cm);
234
235
236 reconfig_list = malloc(sizeof(reconfig_list[0]) * (reconfig_node_count + 2));
237 for (i=0; i < (reconfig_node_count + 1); i++) {
238 reconfig_list[i] = strdup("client");
239 }
240 reconfig_list[reconfig_node_count + 1] = NULL;
241
242 src = EVcreate_submit_handle(cm, DFG_SOURCE, simple_format_list);
243 source_capabilities = EVclient_register_source("master_source", src);
244 chandle = strdup("thandler");
245
246 sink_capabilities = EVclient_register_sink_handler(cm,chandle, simple_format_list,
247 (EVSimpleHandlerFunc) simple_handler);
248 EVdfg_join_dfg(dfg, argv[1], argv[2]);
249
250 if (strcmp(argv[1], "forker") == 0) {
251 test_fork_children(&reconfig_list[0], argv[2]);
252 }
253
254 EVclient_ready_wait(dfg);
255 if (EVclient_active_sink_count(dfg) == 0) {
256 EVclient_ready_for_shutdown(dfg);
257 }
258
259 if (EVclient_source_active(src)) {
260 simple_rec rec;
261 generate_simple_record(&rec);
262 /* submit will be quietly ignored if source is not active */
263 EVsubmit(src, &rec, NULL);
264 }
265 return EVclient_wait_for_shutdown(dfg);
266 }
267