1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <math.h>
4 #include <string.h>
5 
6 #include "config.h"
7 #include "ev_dfg.h"
8 #include "test_support.h"
9 
10 CManager cm;
11 
12 static int status;
13 static EVdfg dfg;
14 static int base=2;
15 static int nbase = 2;
16 static int level_count = 3;
17 
18 static EVdfg_stone *tmp;
19 static EVdfg_stone *last;
20 
21 const int reconfig_node_count = 3;
22 const int num_terminals = 2;
23 
24 char *str_contact;
25 char **reconfig_list = NULL;
26 
27 static
28 int
simple_handler(CManager cm,void * vevent,void * client_data,attr_list attrs)29 simple_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
30 {
31     simple_rec_ptr event = vevent;
32     (void)cm;
33     (void)client_data;
34     static int count;
35     checksum_simple_record(event, attrs, quiet);
36     if (++count == 300) {
37 		EVclient_shutdown(dfg, 0);
38     }
39     printf("\nStatic configuration working, ready for reconfiguration! All the best!\n");
40     fflush(stdout);
41     return 0;
42 }
43 
44 
45 /* ****** Algorithm ******
46 
47  - The initial tree will consist of two nodes, root and one child
48 
49  - First child freezes the dataflow and deletes the link between the root and it's only child
50  - First child then forks n number of children which will now attempt to join the dfg
51  - First child does not call realize, dataflow is frozen
52 
53  - Every node joining in creates stones and links them appropriately
54  - According to the number of the node joining in, it will either be an intermediate node or a leaf node
55  - Leaf nodes will have terminal actions associated with them and intermidate nodes will have split actions (as of now)
56  - When appropriate number of nodes have joined in, the graph is realized
57  - The above three points can be implemented by basically unrolling the loops in the static tree_test code
58 
59  - Timing measurements can be made by starting the timer in the node_register_handler only when the 2nd node for reconfiguration joins in
60  - The timer will be stopped when the graph has been realized, mostly inside node_register_handler
61 
62  ****** ********* ****** */
63 
64 static void
join_handler(EVdfg dfg,char * identifier,void * available_sources,void * available_sinks)65 join_handler(EVdfg dfg, char *identifier, void* available_sources, void *available_sinks)
66 {
67 
68     static int joined_node_count;
69     static int i = 2;
70     static int j;
71     static int k;
72 
73     char *thandle;
74     char *canon_name = malloc(20 * sizeof(char));
75 
76     if (strcmp(identifier, "origin") == 0) {
77 	EVdfg_stone terminal = NULL;
78 
79 	canon_name = strdup("origin");
80 	EVmaster_assign_canonical_name(dfg, identifier, canon_name);
81 
82 	EVclient_register_sink_handler(cm, "thandler", simple_format_list, (EVSimpleHandlerFunc) simple_handler);
83 	terminal = EVdfg_create_sink_stone(dfg, "thandler");
84 
85 	EVdfg_link_port(last[0], 0, terminal);
86 	EVdfg_assign_node(last[0], "origin");
87 	EVdfg_assign_node(terminal, "origin");
88 
89 	++joined_node_count;
90 	EVdfg_realize(dfg);
91     } else {
92 	if (strcmp(identifier, "forker") == 0) {
93 	    //      EVdfg_reconfig_delete_link(dfg, 0, 1);
94 
95 	    //      canon_name = malloc(20 * sizeof(char));
96 	    ++joined_node_count;
97 
98 	    //      delayed_fork_children(cm, &reconfig_list[0], str_contact, 5);
99 	} else {
100 	    if (i < level_count) {
101 		//        if (j < nbase) {
102 		tmp[j] = EVdfg_create_stone(dfg, NULL);
103 
104 		sprintf(canon_name, "client%d", joined_node_count);
105 		EVmaster_assign_canonical_name(dfg, identifier, canon_name);
106 		EVdfg_assign_node(tmp[j], canon_name);
107 
108 		EVdfg_reconfig_link_port(last[j / base], j % base, tmp[j], NULL);
109 
110 		++joined_node_count;
111 		++j;
112 		//        }
113 		if (j >= nbase) {
114 		    for (j = 0; j < nbase; ++j) {
115 			last[j] = tmp[j];
116 		    }
117 		    nbase *= base;
118 		    j = 0;
119 		    ++i;
120 		}
121 	    } else {
122 		thandle = strdup("thandler");
123 		//	sprintf(thandle, "thandler%d", joined_node_count);
124 		EVclient_register_sink_handler(cm, thandle, simple_format_list, (EVSimpleHandlerFunc) simple_handler);
125 		tmp[k] = EVdfg_create_sink_stone(dfg, thandle);
126 
127 		sprintf(canon_name, "terminal_node%d", joined_node_count);
128 		EVmaster_assign_canonical_name(dfg, identifier, canon_name);
129 		EVdfg_assign_node(tmp[k], canon_name);
130 
131 		EVdfg_reconfig_link_port(last[k / base], k % base, tmp[k], NULL);
132 
133 		++k;
134 		++joined_node_count;
135 		if (k >= nbase) {
136 		    EVdfg_realize(dfg);
137 		}
138 	    }
139 	}
140     }
141 }
142 
143 
144 extern int
be_test_master(int argc,char ** argv)145 be_test_master(int argc, char **argv)
146 {
147     attr_list contact_list;
148     EVsource source_handle;
149     int last_row_size;
150     int i;
151     char **nodes;
152 
153     nodes = malloc(2 * sizeof(nodes[0]));
154 
155     nodes[0] = strdup("origin");
156     nodes[1] = NULL;
157 
158     last_row_size = (int) pow(base, level_count - 1);
159 
160     tmp = malloc(last_row_size * sizeof(EVdfg_stone));
161     last = malloc(last_row_size * sizeof(EVdfg_stone));
162 
163     cm = CManager_create();
164     CMlisten(cm);
165     contact_list = CMget_contact_list(cm);
166     str_contact = attr_list_to_string(contact_list);
167 
168     printf("\nMaster's contact = %s\n", str_contact);
169     fflush(stdout);
170 
171     source_handle = EVcreate_submit_handle(cm, DFG_SOURCE ,simple_format_list);
172     source_capabilities = EVclient_register_source("master_source", source_handle);
173 
174     dfg = EVdfg_create(cm);
175     EVmaster_node_join_handler(dfg, join_handler);;
176 
177     last[0] = EVdfg_create_source_stone(dfg, "master_source");
178 
179     /* pprabhu: creating list of reconfiguration node names */
180     reconfig_list = malloc(sizeof(reconfig_list[0]) * (reconfig_node_count + 2));
181     for (i=0; i < (reconfig_node_count + 1); i++) {
182         reconfig_list[i] = strdup("client");
183     }
184     reconfig_list[reconfig_node_count + 1] = NULL;
185 
186     EVdfg_join_dfg(dfg, "origin", str_contact);
187 
188     if (EVclient_ready_wait(dfg) != 1) {
189 	/* dfg initialization failed! */
190 	exit(1);
191     }
192 
193 
194     if (EVclient_active_sink_count(dfg) == 0) {
195 	EVclient_ready_for_shutdown(dfg);
196     }
197 
198     if (EVclient_source_active(source_handle)) {
199 	simple_rec rec;
200 	for (i = 0; i < 300; ++i) {
201 	    CMsleep(cm, 1);
202 	    generate_simple_record(&rec);
203 	    /* submit would be quietly ignored if source is not active */
204 	    EVsubmit(source_handle, &rec, NULL);
205 	}
206     }
207 
208     status = EVclient_wait_for_shutdown(dfg);
209 
210     wait_for_children(nodes);
211 
212     CManager_close(cm);
213     return status;
214 }
215 
216 
217 extern int
be_test_child(int argc,char ** argv)218 be_test_child(int argc, char **argv)
219 {
220     CManager cm;
221     EVsource src;
222     char *chandle;
223 
224     int i;
225 
226     cm = CManager_create();
227 
228     if (argc != 3) {
229         printf("Child usage:  evtest  <nodename> <mastercontact>\n");
230         exit(1);
231     }
232 
233     dfg = EVdfg_create(cm);
234 
235 
236     reconfig_list = malloc(sizeof(reconfig_list[0]) * (reconfig_node_count + 2));
237     for (i=0; i < (reconfig_node_count + 1); i++) {
238         reconfig_list[i] = strdup("client");
239     }
240     reconfig_list[reconfig_node_count + 1] = NULL;
241 
242     src = EVcreate_submit_handle(cm, DFG_SOURCE, simple_format_list);
243     source_capabilities = EVclient_register_source("master_source", src);
244     chandle = strdup("thandler");
245 
246     sink_capabilities = EVclient_register_sink_handler(cm,chandle, simple_format_list,
247                                 (EVSimpleHandlerFunc) simple_handler);
248     EVdfg_join_dfg(dfg, argv[1], argv[2]);
249 
250     if (strcmp(argv[1], "forker") == 0) {
251 		test_fork_children(&reconfig_list[0], argv[2]);
252     }
253 
254     EVclient_ready_wait(dfg);
255     if (EVclient_active_sink_count(dfg) == 0) {
256         EVclient_ready_for_shutdown(dfg);
257     }
258 
259     if (EVclient_source_active(src)) {
260         simple_rec rec;
261         generate_simple_record(&rec);
262         /* submit will be quietly ignored if source is not active */
263         EVsubmit(src, &rec, NULL);
264     }
265     return EVclient_wait_for_shutdown(dfg);
266 }
267