1 /*
2 * Test the ability of a CoD handler to reference other stones by name, see notes below for details
3 */
4
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8
9 #include "config.h"
10 #include "evpath.h"
11 #include "ev_dfg.h"
12 #include "test_support.h"
13
14 typedef struct _result {
15 int tally;
16 } result, *result_ptr;
17
18 static FMField tally_list[] =
19 {
20 {"tally", "integer",
21 sizeof(int), FMOffset(result_ptr, tally)},
22 {NULL, NULL, 0, 0}
23 };
24
25 static FMStructDescRec result_format_list[] =
26 {
27 {"result", tally_list, sizeof(result), NULL},
28 {NULL, NULL, 0, NULL}
29 };
30
31 static int status;
32 static EVclient test_client;
33
34 static
35 int
event_handler(CManager cm,void * vevent,void * client_data,attr_list attrs)36 event_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
37 {
38 EVstone this_stone;
39 attr_list stone_attrs;
40 simple_rec_ptr event = vevent;
41 int message_count = 0;
42 static atom_t MSG_COUNT_ATOM = -1;
43 (void)cm;
44 (void)client_data;
45 if (quiet <= 0) printf("In handler for stone %d\n", EVexecuting_stone(cm));
46 if (MSG_COUNT_ATOM == -1) {
47 MSG_COUNT_ATOM = attr_atom_from_string("MSG_COUNT");
48 }
49 checksum_simple_record(event, attrs, quiet);
50 /* get handle to the terminal stone */
51 this_stone = EVexecuting_stone(cm);
52 stone_attrs = EVextract_attr_list(cm, this_stone);
53 /* get the message count attribute */
54 if (!get_int_attr(stone_attrs, MSG_COUNT_ATOM, &message_count)) {
55 message_count = 0;
56 }
57 /* increment the message count */
58 message_count++;
59 /* set the message count attribute */
60 set_int_attr(stone_attrs, MSG_COUNT_ATOM, message_count);
61 return 0;
62 }
63
64 static
65 int
result_handler(CManager cm,void * vevent,void * client_data,attr_list attrs)66 result_handler(CManager cm, void *vevent, void *client_data, attr_list attrs)
67 {
68 result_ptr event = vevent;
69 (void)cm;
70 if (quiet <= 0) {
71 printf("In the handler, event data is :\n");
72 printf(" tally = %d\n", event->tally);
73 printf("Data was received with attributes : \n");
74 if (attrs) dump_attr_list(attrs);
75 }
76 if (event->tally > 10) {
77 static int shutdown_done = 0;
78 if (!shutdown_done) {
79 EVclient_force_shutdown(test_client, 0);
80 shutdown_done++;
81 }
82 }
83 return 0;
84 }
85
86 extern int
be_test_master(int argc,char ** argv)87 be_test_master(int argc, char **argv)
88 {
89 char *nodes[] = {"a", "b", NULL};
90 CManager cm;
91 char *str_contact;
92 EVdfg_stone S1, T1, A1, T2;
93 EVsource source_handle;
94 EVmaster test_master;
95 EVdfg test_dfg;
96 EVclient_sinks sink_capabilities;
97 EVclient_sources source_capabilities;
98 char *A1_action_spec;
99 attr_list T1_name_attrs;
100 (void)argc; (void)argv;
101 cm = CManager_create();
102 CMlisten(cm);
103
104 /*
105 ** LOCAL DFG SUPPORT Sources and sinks that might or might not be utilized.
106 */
107
108 source_handle = EVcreate_submit_handle(cm, DFG_SOURCE, simple_format_list);
109 source_capabilities = EVclient_register_source("master_source", source_handle);
110 (void)EVclient_register_sink_handler(cm, "event_handler", simple_format_list,
111 (EVSimpleHandlerFunc) event_handler, NULL);
112 sink_capabilities = EVclient_register_sink_handler(cm, "result_handler", result_format_list,
113 (EVSimpleHandlerFunc) result_handler, NULL);
114
115 /*
116 ** MASTER and DFG CREATION
117 */
118 test_master = EVmaster_create(cm);
119 str_contact = EVmaster_get_contact_list(test_master);
120 EVmaster_register_node_list(test_master, &nodes[0]);
121
122 /* create:
123 - one source (S1) to generate a series of events.
124 - one terminal stone (T1) to catch those events and update it's own
125 attribute list. This stone is "named", so it's attribute list is
126 accessible from other stones.
127 - one auto-stone (A1, a transform stone with automatic event submission
128 at a particular frequency) which "monitors" the status of the
129 terminal stone by watching its attributes. When it notes a
130 termination condition, it generates a "result" event.
131 - a terminal stone (T2) that will catch the final event and report the result.
132
133 In this arrangement, the initial source (S1) is linked to the first
134 terminal stone (T1) and the autostone (A1) is linked to the final
135 terminal stone (T2). The actual monitoring occurs via CoD-based
136 access from A1 to the attribute lists of T1. These stones *MUST* be
137 co-located for this to occur. (Stone names are local to a node, and
138 direct attribute access is only supported between stones on a single
139 node.)
140 */
141
142 char *COD_monitor = "{\n\
143 attr_list tmp = EVget_stone_attrs(\"my_data_stone\");\n \
144 int message_count = attr_ivalue(tmp, \"MSG_COUNT\");\n \
145 output.tally = message_count;\n\
146 \n\
147 return 1;\n\
148 }";
149
150
151 test_dfg = EVdfg_create(test_master);
152 S1 = EVdfg_create_source_stone(test_dfg, "master_source");
153 EVdfg_assign_node(S1, "b");
154 T1 = EVdfg_create_sink_stone(test_dfg, "event_handler");
155 T1_name_attrs = create_attr_list();
156 set_string_attr(T1_name_attrs, attr_atom_from_string("EVP_STONE_NAME"), strdup("my_data_stone"));
157 EVdfg_set_attr_list(T1, T1_name_attrs); /* this one will cause reconfig */
158 free_attr_list(T1_name_attrs);
159 T2 = EVdfg_create_sink_stone(test_dfg, "result_handler");
160
161 A1_action_spec = create_transform_action_spec(NULL, result_format_list, COD_monitor);
162 A1 = EVdfg_create_stone(test_dfg, A1_action_spec);
163 free(A1_action_spec);
164 EVdfg_assign_node(S1, "b");
165 EVdfg_assign_node(T1, "a");
166 EVdfg_assign_node(A1, "a");
167 EVdfg_assign_node(T2, "b");
168 EVdfg_link_dest(S1, T1);
169 EVdfg_link_dest(A1, T2);
170 EVdfg_enable_auto_stone(A1, 1, 0); /* 1 sec intervals for monitoring */
171
172 EVdfg_realize(test_dfg);
173
174 /* We're node 0 in the process group */
175 test_client = EVclient_assoc_local(cm, nodes[0], test_master, source_capabilities, sink_capabilities);
176
177 /* Fork the others */
178 test_fork_children(&nodes[0], str_contact);
179
180 free(str_contact);
181 if (EVclient_ready_wait(test_client) != 1) {
182 /* dfg initialization failed! */
183 exit(1);
184 }
185
186 if (EVclient_source_active(source_handle)) {
187 while (!EVclient_test_for_shutdown(test_client)) {
188 simple_rec rec;
189 generate_simple_record(&rec);
190 /* submit would be quietly ignored if source is not active */
191 EVsubmit(source_handle, &rec, NULL);
192 CMusleep(cm, 999999); /* wait for .1 sec */
193 }
194 /* exit if shutdown is imminent */
195 }
196
197 status = EVclient_wait_for_shutdown(test_client);
198
199 wait_for_children(nodes);
200
201 EVfree_source(source_handle);
202 CManager_close(cm);
203 return status;
204 }
205
206
207 extern int
be_test_child(int argc,char ** argv)208 be_test_child(int argc, char **argv)
209 {
210 CManager cm;
211 EVsource src;
212 EVclient_sinks sink_capabilities;
213 EVclient_sources source_capabilities;
214
215 cm = CManager_create();
216 if (argc != 3) {
217 printf("Child usage: multi_sink <nodename> <mastercontact>\n");
218 exit(1);
219 }
220
221 src = EVcreate_submit_handle(cm, DFG_SOURCE, simple_format_list);
222 source_capabilities = EVclient_register_source("master_source", src);
223 (void)EVclient_register_sink_handler(cm, "event_handler", simple_format_list,
224 (EVSimpleHandlerFunc) event_handler, NULL);
225 sink_capabilities = EVclient_register_sink_handler(cm, "result_handler", result_format_list,
226 (EVSimpleHandlerFunc) result_handler, NULL);
227 test_client = EVclient_assoc(cm, argv[1], argv[2], source_capabilities, sink_capabilities);
228 EVclient_ready_wait(test_client);
229
230 if (EVclient_source_active(src)) {
231 while (!EVclient_test_for_shutdown(test_client)) {
232 simple_rec rec;
233 generate_simple_record(&rec);
234 /* submit would be quietly ignored if source is not active */
235 EVsubmit(src, &rec, NULL);
236 CMusleep(cm, 999999); /* wait for .1 sec */
237 }
238 /* exit if shutdown is imminent */
239 }
240
241 EVfree_source(src);
242 return EVclient_wait_for_shutdown(test_client);
243 }
244