1 /*
2  *  Test the ability of a CoD handler to reference other stones by name, see notes below for details
3  */
4 
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 
9 #include "config.h"
10 #include "evpath.h"
11 #include "ev_dfg.h"
12 #include "test_support.h"
13 
14 typedef struct _result {
15     int tally;
16 } result, *result_ptr;
17 
18 static FMField tally_list[] =
19 {
20     {"tally", "integer",
21      sizeof(int), FMOffset(result_ptr, tally)},
22     {NULL, NULL, 0, 0}
23 };
24 
25 static FMStructDescRec result_format_list[] =
26 {
27     {"result", tally_list, sizeof(result), NULL},
28     {NULL, NULL, 0, NULL}
29 };
30 
31 static int status;
32 static EVclient test_client;
33 
34 static
35 int
event_handler(CManager cm,void * vevent,void * client_data,attr_list attrs)36 event_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
37 {
38     EVstone this_stone;
39     attr_list stone_attrs;
40     simple_rec_ptr event = vevent;
41     int message_count = 0;
42     static atom_t MSG_COUNT_ATOM = -1;
43     (void)cm;
44     (void)client_data;
45     if (quiet <= 0) printf("In handler for stone %d\n", EVexecuting_stone(cm));
46     if (MSG_COUNT_ATOM == -1) {
47 	MSG_COUNT_ATOM = attr_atom_from_string("MSG_COUNT");
48     }
49     checksum_simple_record(event, attrs, quiet);
50     /* get handle to the terminal stone */
51     this_stone = EVexecuting_stone(cm);
52     stone_attrs = EVextract_attr_list(cm, this_stone);
53     /* get the message count attribute */
54     if (!get_int_attr(stone_attrs, MSG_COUNT_ATOM, &message_count)) {
55 	message_count = 0;
56     }
57     /* increment the message count */
58     message_count++;
59     /* set the message count attribute */
60     set_int_attr(stone_attrs, MSG_COUNT_ATOM, message_count);
61     return 0;
62 }
63 
64 static
65 int
result_handler(CManager cm,void * vevent,void * client_data,attr_list attrs)66 result_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
67 {
68     result_ptr event = vevent;
69     (void)cm;
70     if (quiet <= 0) {
71 	printf("In the handler, event data is :\n");
72 	printf("	tally = %d\n", event->tally);
73 	printf("Data was received with attributes : \n");
74 	if (attrs) dump_attr_list(attrs);
75     }
76     if (event->tally > 10) {
77 	static int shutdown_done = 0;
78 	if (!shutdown_done) {
79 	    EVclient_force_shutdown(test_client, 0);
80 	    shutdown_done++;
81 	}
82     }
83     return 0;
84 }
85 
86 extern int
be_test_master(int argc,char ** argv)87 be_test_master(int argc, char **argv)
88 {
89     char *nodes[] = {"a", "b", NULL};
90     CManager cm;
91     char *str_contact;
92     EVdfg_stone S1, T1, A1, T2;
93     EVsource source_handle;
94     EVmaster test_master;
95     EVdfg test_dfg;
96     EVclient_sinks sink_capabilities;
97     EVclient_sources source_capabilities;
98     char *A1_action_spec;
99     attr_list T1_name_attrs;
100     (void)argc; (void)argv;
101     cm = CManager_create();
102     CMlisten(cm);
103 
104 /*
105 **  LOCAL DFG SUPPORT   Sources and sinks that might or might not be utilized.
106 */
107 
108     source_handle = EVcreate_submit_handle(cm, DFG_SOURCE, simple_format_list);
109     source_capabilities = EVclient_register_source("master_source", source_handle);
110     (void)EVclient_register_sink_handler(cm, "event_handler", simple_format_list,
111 				(EVSimpleHandlerFunc) event_handler, NULL);
112     sink_capabilities = EVclient_register_sink_handler(cm, "result_handler", result_format_list,
113 				(EVSimpleHandlerFunc) result_handler, NULL);
114 
115 /*
116 **  MASTER and DFG CREATION
117 */
118     test_master = EVmaster_create(cm);
119     str_contact = EVmaster_get_contact_list(test_master);
120     EVmaster_register_node_list(test_master, &nodes[0]);
121 
122     /* create:
123        - one source (S1) to generate a series of events.
124        - one terminal stone (T1) to catch those events and update it's own
125          attribute list.  This stone is "named", so it's attribute list is
126          accessible from other stones.
127        - one auto-stone (A1, a transform stone with automatic event submission
128          at a particular frequency) which "monitors" the status of the
129          terminal stone by watching its attributes.  When it notes a
130          termination condition, it generates a "result" event.
131        - a terminal stone (T2) that will catch the final event and report the result.
132 
133        In this arrangement, the initial source (S1) is linked to the first
134        terminal stone (T1) and the autostone (A1) is linked to the final
135        terminal stone (T2).  The actual monitoring occurs via CoD-based
136        access from A1 to the attribute lists of T1.  These stones *MUST* be
137        co-located for this to occur.  (Stone names are local to a node, and
138        direct attribute access is only supported between stones on a single
139        node.)
140     */
141 
142 char *COD_monitor = "{\n\
143     attr_list tmp = EVget_stone_attrs(\"my_data_stone\");\n	\
144     int message_count = attr_ivalue(tmp, \"MSG_COUNT\");\n	\
145     output.tally = message_count;\n\
146 \n\
147     return 1;\n\
148 }";
149 
150 
151     test_dfg = EVdfg_create(test_master);
152     S1 = EVdfg_create_source_stone(test_dfg, "master_source");
153     EVdfg_assign_node(S1, "b");
154     T1 = EVdfg_create_sink_stone(test_dfg, "event_handler");
155     T1_name_attrs = create_attr_list();
156     set_string_attr(T1_name_attrs, attr_atom_from_string("EVP_STONE_NAME"), strdup("my_data_stone"));
157     EVdfg_set_attr_list(T1, T1_name_attrs);  /* this one will cause reconfig */
158     free_attr_list(T1_name_attrs);
159     T2 = EVdfg_create_sink_stone(test_dfg, "result_handler");
160 
161     A1_action_spec = create_transform_action_spec(NULL, result_format_list, COD_monitor);
162     A1 = EVdfg_create_stone(test_dfg, A1_action_spec);
163     free(A1_action_spec);
164     EVdfg_assign_node(S1, "b");
165     EVdfg_assign_node(T1, "a");
166     EVdfg_assign_node(A1, "a");
167     EVdfg_assign_node(T2, "b");
168     EVdfg_link_dest(S1, T1);
169     EVdfg_link_dest(A1, T2);
170     EVdfg_enable_auto_stone(A1, 1, 0);  /* 1 sec intervals for monitoring */
171 
172     EVdfg_realize(test_dfg);
173 
174 /* We're node 0 in the process group */
175     test_client = EVclient_assoc_local(cm, nodes[0], test_master, source_capabilities, sink_capabilities);
176 
177 /* Fork the others */
178     test_fork_children(&nodes[0], str_contact);
179 
180     free(str_contact);
181     if (EVclient_ready_wait(test_client) != 1) {
182 	/* dfg initialization failed! */
183 	exit(1);
184     }
185 
186     if (EVclient_source_active(source_handle)) {
187 	while (!EVclient_test_for_shutdown(test_client)) {
188 	    simple_rec rec;
189 	    generate_simple_record(&rec);
190 	    /* submit would be quietly ignored if source is not active */
191 	    EVsubmit(source_handle, &rec, NULL);
192 	    CMusleep(cm, 999999); /* wait for .1 sec */
193 	}
194 	/* exit if shutdown is imminent */
195     }
196 
197     status = EVclient_wait_for_shutdown(test_client);
198 
199     wait_for_children(nodes);
200 
201     EVfree_source(source_handle);
202     CManager_close(cm);
203     return status;
204 }
205 
206 
207 extern int
be_test_child(int argc,char ** argv)208 be_test_child(int argc, char **argv)
209 {
210     CManager cm;
211     EVsource src;
212     EVclient_sinks sink_capabilities;
213     EVclient_sources source_capabilities;
214 
215     cm = CManager_create();
216     if (argc != 3) {
217 	printf("Child usage:  multi_sink  <nodename> <mastercontact>\n");
218 	exit(1);
219     }
220 
221     src = EVcreate_submit_handle(cm, DFG_SOURCE, simple_format_list);
222     source_capabilities = EVclient_register_source("master_source", src);
223     (void)EVclient_register_sink_handler(cm, "event_handler", simple_format_list,
224 				(EVSimpleHandlerFunc) event_handler, NULL);
225     sink_capabilities = EVclient_register_sink_handler(cm, "result_handler", result_format_list,
226 				(EVSimpleHandlerFunc) result_handler, NULL);
227     test_client = EVclient_assoc(cm, argv[1], argv[2], source_capabilities, sink_capabilities);
228     EVclient_ready_wait(test_client);
229 
230     if (EVclient_source_active(src)) {
231 	while (!EVclient_test_for_shutdown(test_client)) {
232 	    simple_rec rec;
233 	    generate_simple_record(&rec);
234 	    /* submit would be quietly ignored if source is not active */
235 	    EVsubmit(src, &rec, NULL);
236 	    CMusleep(cm, 999999); /* wait for .1 sec */
237 	}
238 	/* exit if shutdown is imminent */
239     }
240 
241     EVfree_source(src);
242     return EVclient_wait_for_shutdown(test_client);
243 }
244